Ignore not-yet-defined Portals in pg_cursors view.
[pgsql.git] / contrib / postgres_fdw / postgres_fdw.c
blobfc65d81e2177ada3671a1fd1296280cd64a5e046
1 /*-------------------------------------------------------------------------
3 * postgres_fdw.c
4 * Foreign-data wrapper for remote PostgreSQL servers
6 * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * contrib/postgres_fdw/postgres_fdw.c
11 *-------------------------------------------------------------------------
13 #include "postgres.h"
15 #include <limits.h>
17 #include "access/htup_details.h"
18 #include "access/sysattr.h"
19 #include "access/table.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_opfamily.h"
22 #include "commands/defrem.h"
23 #include "commands/explain.h"
24 #include "commands/vacuum.h"
25 #include "executor/execAsync.h"
26 #include "foreign/fdwapi.h"
27 #include "funcapi.h"
28 #include "miscadmin.h"
29 #include "nodes/makefuncs.h"
30 #include "nodes/nodeFuncs.h"
31 #include "optimizer/appendinfo.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/cost.h"
34 #include "optimizer/inherit.h"
35 #include "optimizer/optimizer.h"
36 #include "optimizer/pathnode.h"
37 #include "optimizer/paths.h"
38 #include "optimizer/planmain.h"
39 #include "optimizer/prep.h"
40 #include "optimizer/restrictinfo.h"
41 #include "optimizer/tlist.h"
42 #include "parser/parsetree.h"
43 #include "postgres_fdw.h"
44 #include "storage/latch.h"
45 #include "utils/builtins.h"
46 #include "utils/float.h"
47 #include "utils/guc.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/sampling.h"
52 #include "utils/selfuncs.h"
54 PG_MODULE_MAGIC;
56 /* Default CPU cost to start up a foreign query. */
57 #define DEFAULT_FDW_STARTUP_COST 100.0
59 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
60 #define DEFAULT_FDW_TUPLE_COST 0.2
62 /* If no remote estimates, assume a sort costs 20% extra */
63 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
66 * Indexes of FDW-private information stored in fdw_private lists.
68 * These items are indexed with the enum FdwScanPrivateIndex, so an item
69 * can be fetched with list_nth(). For example, to get the SELECT statement:
70 * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
72 enum FdwScanPrivateIndex
74 /* SQL statement to execute remotely (as a String node) */
75 FdwScanPrivateSelectSql,
76 /* Integer list of attribute numbers retrieved by the SELECT */
77 FdwScanPrivateRetrievedAttrs,
78 /* Integer representing the desired fetch_size */
79 FdwScanPrivateFetchSize,
82 * String describing join i.e. names of relations being joined and types
83 * of join, added when the scan is join
85 FdwScanPrivateRelations,
89 * Similarly, this enum describes what's kept in the fdw_private list for
90 * a ModifyTable node referencing a postgres_fdw foreign table. We store:
92 * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
93 * 2) Integer list of target attribute numbers for INSERT/UPDATE
94 * (NIL for a DELETE)
95 * 3) Length till the end of VALUES clause for INSERT
96 * (-1 for a DELETE/UPDATE)
97 * 4) Boolean flag showing if the remote query has a RETURNING clause
98 * 5) Integer list of attribute numbers retrieved by RETURNING, if any
100 enum FdwModifyPrivateIndex
102 /* SQL statement to execute remotely (as a String node) */
103 FdwModifyPrivateUpdateSql,
104 /* Integer list of target attribute numbers for INSERT/UPDATE */
105 FdwModifyPrivateTargetAttnums,
106 /* Length till the end of VALUES clause (as an Integer node) */
107 FdwModifyPrivateLen,
108 /* has-returning flag (as a Boolean node) */
109 FdwModifyPrivateHasReturning,
110 /* Integer list of attribute numbers retrieved by RETURNING */
111 FdwModifyPrivateRetrievedAttrs,
115 * Similarly, this enum describes what's kept in the fdw_private list for
116 * a ForeignScan node that modifies a foreign table directly. We store:
118 * 1) UPDATE/DELETE statement text to be sent to the remote server
119 * 2) Boolean flag showing if the remote query has a RETURNING clause
120 * 3) Integer list of attribute numbers retrieved by RETURNING, if any
121 * 4) Boolean flag showing if we set the command es_processed
123 enum FdwDirectModifyPrivateIndex
125 /* SQL statement to execute remotely (as a String node) */
126 FdwDirectModifyPrivateUpdateSql,
127 /* has-returning flag (as a Boolean node) */
128 FdwDirectModifyPrivateHasReturning,
129 /* Integer list of attribute numbers retrieved by RETURNING */
130 FdwDirectModifyPrivateRetrievedAttrs,
131 /* set-processed flag (as a Boolean node) */
132 FdwDirectModifyPrivateSetProcessed,
136 * Execution state of a foreign scan using postgres_fdw.
138 typedef struct PgFdwScanState
140 Relation rel; /* relcache entry for the foreign table. NULL
141 * for a foreign join scan. */
142 TupleDesc tupdesc; /* tuple descriptor of scan */
143 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
145 /* extracted fdw_private data */
146 char *query; /* text of SELECT command */
147 List *retrieved_attrs; /* list of retrieved attribute numbers */
149 /* for remote query execution */
150 PGconn *conn; /* connection for the scan */
151 PgFdwConnState *conn_state; /* extra per-connection state */
152 unsigned int cursor_number; /* quasi-unique ID for my cursor */
153 bool cursor_exists; /* have we created the cursor? */
154 int numParams; /* number of parameters passed to query */
155 FmgrInfo *param_flinfo; /* output conversion functions for them */
156 List *param_exprs; /* executable expressions for param values */
157 const char **param_values; /* textual values of query parameters */
159 /* for storing result tuples */
160 HeapTuple *tuples; /* array of currently-retrieved tuples */
161 int num_tuples; /* # of tuples in array */
162 int next_tuple; /* index of next one to return */
164 /* batch-level state, for optimizing rewinds and avoiding useless fetch */
165 int fetch_ct_2; /* Min(# of fetches done, 2) */
166 bool eof_reached; /* true if last fetch reached EOF */
168 /* for asynchronous execution */
169 bool async_capable; /* engage asynchronous-capable logic? */
171 /* working memory contexts */
172 MemoryContext batch_cxt; /* context holding current batch of tuples */
173 MemoryContext temp_cxt; /* context for per-tuple temporary data */
175 int fetch_size; /* number of tuples per fetch */
176 } PgFdwScanState;
179 * Execution state of a foreign insert/update/delete operation.
181 typedef struct PgFdwModifyState
183 Relation rel; /* relcache entry for the foreign table */
184 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
186 /* for remote query execution */
187 PGconn *conn; /* connection for the scan */
188 PgFdwConnState *conn_state; /* extra per-connection state */
189 char *p_name; /* name of prepared statement, if created */
191 /* extracted fdw_private data */
192 char *query; /* text of INSERT/UPDATE/DELETE command */
193 char *orig_query; /* original text of INSERT command */
194 List *target_attrs; /* list of target attribute numbers */
195 int values_end; /* length up to the end of VALUES */
196 int batch_size; /* value of FDW option "batch_size" */
197 bool has_returning; /* is there a RETURNING clause? */
198 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
200 /* info about parameters for prepared statement */
201 AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
202 int p_nums; /* number of parameters to transmit */
203 FmgrInfo *p_flinfo; /* output conversion functions for them */
205 /* batch operation stuff */
206 int num_slots; /* number of slots to insert */
208 /* working memory context */
209 MemoryContext temp_cxt; /* context for per-tuple temporary data */
211 /* for update row movement if subplan result rel */
212 struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
213 * created */
214 } PgFdwModifyState;
217 * Execution state of a foreign scan that modifies a foreign table directly.
219 typedef struct PgFdwDirectModifyState
221 Relation rel; /* relcache entry for the foreign table */
222 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
224 /* extracted fdw_private data */
225 char *query; /* text of UPDATE/DELETE command */
226 bool has_returning; /* is there a RETURNING clause? */
227 List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
228 bool set_processed; /* do we set the command es_processed? */
230 /* for remote query execution */
231 PGconn *conn; /* connection for the update */
232 PgFdwConnState *conn_state; /* extra per-connection state */
233 int numParams; /* number of parameters passed to query */
234 FmgrInfo *param_flinfo; /* output conversion functions for them */
235 List *param_exprs; /* executable expressions for param values */
236 const char **param_values; /* textual values of query parameters */
238 /* for storing result tuples */
239 PGresult *result; /* result for query */
240 int num_tuples; /* # of result tuples */
241 int next_tuple; /* index of next one to return */
242 Relation resultRel; /* relcache entry for the target relation */
243 AttrNumber *attnoMap; /* array of attnums of input user columns */
244 AttrNumber ctidAttno; /* attnum of input ctid column */
245 AttrNumber oidAttno; /* attnum of input oid column */
246 bool hasSystemCols; /* are there system columns of resultRel? */
248 /* working memory context */
249 MemoryContext temp_cxt; /* context for per-tuple temporary data */
250 } PgFdwDirectModifyState;
253 * Workspace for analyzing a foreign table.
255 typedef struct PgFdwAnalyzeState
257 Relation rel; /* relcache entry for the foreign table */
258 AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
259 List *retrieved_attrs; /* attr numbers retrieved by query */
261 /* collected sample rows */
262 HeapTuple *rows; /* array of size targrows */
263 int targrows; /* target # of sample rows */
264 int numrows; /* # of sample rows collected */
266 /* for random sampling */
267 double samplerows; /* # of rows fetched */
268 double rowstoskip; /* # of rows to skip before next sample */
269 ReservoirStateData rstate; /* state for reservoir sampling */
271 /* working memory contexts */
272 MemoryContext anl_cxt; /* context for per-analyze lifespan data */
273 MemoryContext temp_cxt; /* context for per-tuple temporary data */
274 } PgFdwAnalyzeState;
277 * This enum describes what's kept in the fdw_private list for a ForeignPath.
278 * We store:
280 * 1) Boolean flag showing if the remote query has the final sort
281 * 2) Boolean flag showing if the remote query has the LIMIT clause
283 enum FdwPathPrivateIndex
285 /* has-final-sort flag (as a Boolean node) */
286 FdwPathPrivateHasFinalSort,
287 /* has-limit flag (as a Boolean node) */
288 FdwPathPrivateHasLimit,
291 /* Struct for extra information passed to estimate_path_cost_size() */
292 typedef struct
294 PathTarget *target;
295 bool has_final_sort;
296 bool has_limit;
297 double limit_tuples;
298 int64 count_est;
299 int64 offset_est;
300 } PgFdwPathExtraData;
303 * Identify the attribute where data conversion fails.
305 typedef struct ConversionLocation
307 AttrNumber cur_attno; /* attribute number being processed, or 0 */
308 Relation rel; /* foreign table being processed, or NULL */
309 ForeignScanState *fsstate; /* plan node being processed, or NULL */
310 } ConversionLocation;
312 /* Callback argument for ec_member_matches_foreign */
313 typedef struct
315 Expr *current; /* current expr, or NULL if not yet found */
316 List *already_used; /* expressions already dealt with */
317 } ec_member_foreign_arg;
320 * SQL functions
322 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
325 * FDW callback routines
327 static void postgresGetForeignRelSize(PlannerInfo *root,
328 RelOptInfo *baserel,
329 Oid foreigntableid);
330 static void postgresGetForeignPaths(PlannerInfo *root,
331 RelOptInfo *baserel,
332 Oid foreigntableid);
333 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
334 RelOptInfo *foreignrel,
335 Oid foreigntableid,
336 ForeignPath *best_path,
337 List *tlist,
338 List *scan_clauses,
339 Plan *outer_plan);
340 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
341 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
342 static void postgresReScanForeignScan(ForeignScanState *node);
343 static void postgresEndForeignScan(ForeignScanState *node);
344 static void postgresAddForeignUpdateTargets(PlannerInfo *root,
345 Index rtindex,
346 RangeTblEntry *target_rte,
347 Relation target_relation);
348 static List *postgresPlanForeignModify(PlannerInfo *root,
349 ModifyTable *plan,
350 Index resultRelation,
351 int subplan_index);
352 static void postgresBeginForeignModify(ModifyTableState *mtstate,
353 ResultRelInfo *resultRelInfo,
354 List *fdw_private,
355 int subplan_index,
356 int eflags);
357 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
358 ResultRelInfo *resultRelInfo,
359 TupleTableSlot *slot,
360 TupleTableSlot *planSlot);
361 static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
362 ResultRelInfo *resultRelInfo,
363 TupleTableSlot **slots,
364 TupleTableSlot **planSlots,
365 int *numSlots);
366 static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
367 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
368 ResultRelInfo *resultRelInfo,
369 TupleTableSlot *slot,
370 TupleTableSlot *planSlot);
371 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
372 ResultRelInfo *resultRelInfo,
373 TupleTableSlot *slot,
374 TupleTableSlot *planSlot);
375 static void postgresEndForeignModify(EState *estate,
376 ResultRelInfo *resultRelInfo);
377 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
378 ResultRelInfo *resultRelInfo);
379 static void postgresEndForeignInsert(EState *estate,
380 ResultRelInfo *resultRelInfo);
381 static int postgresIsForeignRelUpdatable(Relation rel);
382 static bool postgresPlanDirectModify(PlannerInfo *root,
383 ModifyTable *plan,
384 Index resultRelation,
385 int subplan_index);
386 static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
387 static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
388 static void postgresEndDirectModify(ForeignScanState *node);
389 static void postgresExplainForeignScan(ForeignScanState *node,
390 ExplainState *es);
391 static void postgresExplainForeignModify(ModifyTableState *mtstate,
392 ResultRelInfo *rinfo,
393 List *fdw_private,
394 int subplan_index,
395 ExplainState *es);
396 static void postgresExplainDirectModify(ForeignScanState *node,
397 ExplainState *es);
398 static void postgresExecForeignTruncate(List *rels,
399 DropBehavior behavior,
400 bool restart_seqs);
401 static bool postgresAnalyzeForeignTable(Relation relation,
402 AcquireSampleRowsFunc *func,
403 BlockNumber *totalpages);
404 static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
405 Oid serverOid);
406 static void postgresGetForeignJoinPaths(PlannerInfo *root,
407 RelOptInfo *joinrel,
408 RelOptInfo *outerrel,
409 RelOptInfo *innerrel,
410 JoinType jointype,
411 JoinPathExtraData *extra);
412 static bool postgresRecheckForeignScan(ForeignScanState *node,
413 TupleTableSlot *slot);
414 static void postgresGetForeignUpperPaths(PlannerInfo *root,
415 UpperRelationKind stage,
416 RelOptInfo *input_rel,
417 RelOptInfo *output_rel,
418 void *extra);
419 static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
420 static void postgresForeignAsyncRequest(AsyncRequest *areq);
421 static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
422 static void postgresForeignAsyncNotify(AsyncRequest *areq);
425 * Helper functions
427 static void estimate_path_cost_size(PlannerInfo *root,
428 RelOptInfo *foreignrel,
429 List *param_join_conds,
430 List *pathkeys,
431 PgFdwPathExtraData *fpextra,
432 double *p_rows, int *p_width,
433 Cost *p_startup_cost, Cost *p_total_cost);
434 static void get_remote_estimate(const char *sql,
435 PGconn *conn,
436 double *rows,
437 int *width,
438 Cost *startup_cost,
439 Cost *total_cost);
440 static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
441 List *pathkeys,
442 double retrieved_rows,
443 double width,
444 double limit_tuples,
445 Cost *p_startup_cost,
446 Cost *p_run_cost);
447 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
448 EquivalenceClass *ec, EquivalenceMember *em,
449 void *arg);
450 static void create_cursor(ForeignScanState *node);
451 static void fetch_more_data(ForeignScanState *node);
452 static void close_cursor(PGconn *conn, unsigned int cursor_number,
453 PgFdwConnState *conn_state);
454 static PgFdwModifyState *create_foreign_modify(EState *estate,
455 RangeTblEntry *rte,
456 ResultRelInfo *resultRelInfo,
457 CmdType operation,
458 Plan *subplan,
459 char *query,
460 List *target_attrs,
461 int values_end,
462 bool has_returning,
463 List *retrieved_attrs);
464 static TupleTableSlot **execute_foreign_modify(EState *estate,
465 ResultRelInfo *resultRelInfo,
466 CmdType operation,
467 TupleTableSlot **slots,
468 TupleTableSlot **planSlots,
469 int *numSlots);
470 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
471 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
472 ItemPointer tupleid,
473 TupleTableSlot **slots,
474 int numSlots);
475 static void store_returning_result(PgFdwModifyState *fmstate,
476 TupleTableSlot *slot, PGresult *res);
477 static void finish_foreign_modify(PgFdwModifyState *fmstate);
478 static void deallocate_query(PgFdwModifyState *fmstate);
479 static List *build_remote_returning(Index rtindex, Relation rel,
480 List *returningList);
481 static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
482 static void execute_dml_stmt(ForeignScanState *node);
483 static TupleTableSlot *get_returning_data(ForeignScanState *node);
484 static void init_returning_filter(PgFdwDirectModifyState *dmstate,
485 List *fdw_scan_tlist,
486 Index rtindex);
487 static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
488 ResultRelInfo *resultRelInfo,
489 TupleTableSlot *slot,
490 EState *estate);
491 static void prepare_query_params(PlanState *node,
492 List *fdw_exprs,
493 int numParams,
494 FmgrInfo **param_flinfo,
495 List **param_exprs,
496 const char ***param_values);
497 static void process_query_params(ExprContext *econtext,
498 FmgrInfo *param_flinfo,
499 List *param_exprs,
500 const char **param_values);
501 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
502 HeapTuple *rows, int targrows,
503 double *totalrows,
504 double *totaldeadrows);
505 static void analyze_row_processor(PGresult *res, int row,
506 PgFdwAnalyzeState *astate);
507 static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
508 static void fetch_more_data_begin(AsyncRequest *areq);
509 static void complete_pending_request(AsyncRequest *areq);
510 static HeapTuple make_tuple_from_result_row(PGresult *res,
511 int row,
512 Relation rel,
513 AttInMetadata *attinmeta,
514 List *retrieved_attrs,
515 ForeignScanState *fsstate,
516 MemoryContext temp_context);
517 static void conversion_error_callback(void *arg);
518 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
519 JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
520 JoinPathExtraData *extra);
521 static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
522 Node *havingQual);
523 static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
524 RelOptInfo *rel);
525 static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
526 static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
527 Path *epq_path, List *restrictlist);
528 static void add_foreign_grouping_paths(PlannerInfo *root,
529 RelOptInfo *input_rel,
530 RelOptInfo *grouped_rel,
531 GroupPathExtraData *extra);
532 static void add_foreign_ordered_paths(PlannerInfo *root,
533 RelOptInfo *input_rel,
534 RelOptInfo *ordered_rel);
535 static void add_foreign_final_paths(PlannerInfo *root,
536 RelOptInfo *input_rel,
537 RelOptInfo *final_rel,
538 FinalPathExtraData *extra);
539 static void apply_server_options(PgFdwRelationInfo *fpinfo);
540 static void apply_table_options(PgFdwRelationInfo *fpinfo);
541 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
542 const PgFdwRelationInfo *fpinfo_o,
543 const PgFdwRelationInfo *fpinfo_i);
544 static int get_batch_size_option(Relation rel);
548 * Foreign-data wrapper handler function: return a struct with pointers
549 * to my callback routines.
551 Datum
552 postgres_fdw_handler(PG_FUNCTION_ARGS)
554 FdwRoutine *routine = makeNode(FdwRoutine);
556 /* Functions for scanning foreign tables */
557 routine->GetForeignRelSize = postgresGetForeignRelSize;
558 routine->GetForeignPaths = postgresGetForeignPaths;
559 routine->GetForeignPlan = postgresGetForeignPlan;
560 routine->BeginForeignScan = postgresBeginForeignScan;
561 routine->IterateForeignScan = postgresIterateForeignScan;
562 routine->ReScanForeignScan = postgresReScanForeignScan;
563 routine->EndForeignScan = postgresEndForeignScan;
565 /* Functions for updating foreign tables */
566 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
567 routine->PlanForeignModify = postgresPlanForeignModify;
568 routine->BeginForeignModify = postgresBeginForeignModify;
569 routine->ExecForeignInsert = postgresExecForeignInsert;
570 routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
571 routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize;
572 routine->ExecForeignUpdate = postgresExecForeignUpdate;
573 routine->ExecForeignDelete = postgresExecForeignDelete;
574 routine->EndForeignModify = postgresEndForeignModify;
575 routine->BeginForeignInsert = postgresBeginForeignInsert;
576 routine->EndForeignInsert = postgresEndForeignInsert;
577 routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
578 routine->PlanDirectModify = postgresPlanDirectModify;
579 routine->BeginDirectModify = postgresBeginDirectModify;
580 routine->IterateDirectModify = postgresIterateDirectModify;
581 routine->EndDirectModify = postgresEndDirectModify;
583 /* Function for EvalPlanQual rechecks */
584 routine->RecheckForeignScan = postgresRecheckForeignScan;
585 /* Support functions for EXPLAIN */
586 routine->ExplainForeignScan = postgresExplainForeignScan;
587 routine->ExplainForeignModify = postgresExplainForeignModify;
588 routine->ExplainDirectModify = postgresExplainDirectModify;
590 /* Support function for TRUNCATE */
591 routine->ExecForeignTruncate = postgresExecForeignTruncate;
593 /* Support functions for ANALYZE */
594 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
596 /* Support functions for IMPORT FOREIGN SCHEMA */
597 routine->ImportForeignSchema = postgresImportForeignSchema;
599 /* Support functions for join push-down */
600 routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
602 /* Support functions for upper relation push-down */
603 routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
605 /* Support functions for asynchronous execution */
606 routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
607 routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
608 routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
609 routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
611 PG_RETURN_POINTER(routine);
615 * postgresGetForeignRelSize
616 * Estimate # of rows and width of the result of the scan
618 * We should consider the effect of all baserestrictinfo clauses here, but
619 * not any join clauses.
621 static void
622 postgresGetForeignRelSize(PlannerInfo *root,
623 RelOptInfo *baserel,
624 Oid foreigntableid)
626 PgFdwRelationInfo *fpinfo;
627 ListCell *lc;
630 * We use PgFdwRelationInfo to pass various information to subsequent
631 * functions.
633 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
634 baserel->fdw_private = (void *) fpinfo;
636 /* Base foreign tables need to be pushed down always. */
637 fpinfo->pushdown_safe = true;
639 /* Look up foreign-table catalog info. */
640 fpinfo->table = GetForeignTable(foreigntableid);
641 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
644 * Extract user-settable option values. Note that per-table settings of
645 * use_remote_estimate, fetch_size and async_capable override per-server
646 * settings of them, respectively.
648 fpinfo->use_remote_estimate = false;
649 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
650 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
651 fpinfo->shippable_extensions = NIL;
652 fpinfo->fetch_size = 100;
653 fpinfo->async_capable = false;
655 apply_server_options(fpinfo);
656 apply_table_options(fpinfo);
659 * If the table or the server is configured to use remote estimates,
660 * identify which user to do remote access as during planning. This
661 * should match what ExecCheckPermissions() does. If we fail due to lack
662 * of permissions, the query would have failed at runtime anyway.
664 if (fpinfo->use_remote_estimate)
666 Oid userid;
668 userid = OidIsValid(baserel->userid) ? baserel->userid : GetUserId();
669 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
671 else
672 fpinfo->user = NULL;
675 * Identify which baserestrictinfo clauses can be sent to the remote
676 * server and which can't.
678 classifyConditions(root, baserel, baserel->baserestrictinfo,
679 &fpinfo->remote_conds, &fpinfo->local_conds);
682 * Identify which attributes will need to be retrieved from the remote
683 * server. These include all attrs needed for joins or final output, plus
684 * all attrs used in the local_conds. (Note: if we end up using a
685 * parameterized scan, it's possible that some of the join clauses will be
686 * sent to the remote and thus we wouldn't really need to retrieve the
687 * columns used in them. Doesn't seem worth detecting that case though.)
689 fpinfo->attrs_used = NULL;
690 pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
691 &fpinfo->attrs_used);
692 foreach(lc, fpinfo->local_conds)
694 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
696 pull_varattnos((Node *) rinfo->clause, baserel->relid,
697 &fpinfo->attrs_used);
701 * Compute the selectivity and cost of the local_conds, so we don't have
702 * to do it over again for each path. The best we can do for these
703 * conditions is to estimate selectivity on the basis of local statistics.
705 fpinfo->local_conds_sel = clauselist_selectivity(root,
706 fpinfo->local_conds,
707 baserel->relid,
708 JOIN_INNER,
709 NULL);
711 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
714 * Set # of retrieved rows and cached relation costs to some negative
715 * value, so that we can detect when they are set to some sensible values,
716 * during one (usually the first) of the calls to estimate_path_cost_size.
718 fpinfo->retrieved_rows = -1;
719 fpinfo->rel_startup_cost = -1;
720 fpinfo->rel_total_cost = -1;
723 * If the table or the server is configured to use remote estimates,
724 * connect to the foreign server and execute EXPLAIN to estimate the
725 * number of rows selected by the restriction clauses, as well as the
726 * average row width. Otherwise, estimate using whatever statistics we
727 * have locally, in a way similar to ordinary tables.
729 if (fpinfo->use_remote_estimate)
732 * Get cost/size estimates with help of remote server. Save the
733 * values in fpinfo so we don't need to do it again to generate the
734 * basic foreign path.
736 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
737 &fpinfo->rows, &fpinfo->width,
738 &fpinfo->startup_cost, &fpinfo->total_cost);
740 /* Report estimated baserel size to planner. */
741 baserel->rows = fpinfo->rows;
742 baserel->reltarget->width = fpinfo->width;
744 else
747 * If the foreign table has never been ANALYZEd, it will have
748 * reltuples < 0, meaning "unknown". We can't do much if we're not
749 * allowed to consult the remote server, but we can use a hack similar
750 * to plancat.c's treatment of empty relations: use a minimum size
751 * estimate of 10 pages, and divide by the column-datatype-based width
752 * estimate to get the corresponding number of tuples.
754 if (baserel->tuples < 0)
756 baserel->pages = 10;
757 baserel->tuples =
758 (10 * BLCKSZ) / (baserel->reltarget->width +
759 MAXALIGN(SizeofHeapTupleHeader));
762 /* Estimate baserel size as best we can with local statistics. */
763 set_baserel_size_estimates(root, baserel);
765 /* Fill in basically-bogus cost estimates for use later. */
766 estimate_path_cost_size(root, baserel, NIL, NIL, NULL,
767 &fpinfo->rows, &fpinfo->width,
768 &fpinfo->startup_cost, &fpinfo->total_cost);
772 * fpinfo->relation_name gets the numeric rangetable index of the foreign
773 * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a
774 * human-readable string at that time.)
776 fpinfo->relation_name = psprintf("%u", baserel->relid);
778 /* No outer and inner relations. */
779 fpinfo->make_outerrel_subquery = false;
780 fpinfo->make_innerrel_subquery = false;
781 fpinfo->lower_subquery_rels = NULL;
782 fpinfo->hidden_subquery_rels = NULL;
783 /* Set the relation index. */
784 fpinfo->relation_index = baserel->relid;
788 * get_useful_ecs_for_relation
789 * Determine which EquivalenceClasses might be involved in useful
790 * orderings of this relation.
792 * This function is in some respects a mirror image of the core function
793 * pathkeys_useful_for_merging: for a regular table, we know what indexes
794 * we have and want to test whether any of them are useful. For a foreign
795 * table, we don't know what indexes are present on the remote side but
796 * want to speculate about which ones we'd like to use if they existed.
798 * This function returns a list of potentially-useful equivalence classes,
799 * but it does not guarantee that an EquivalenceMember exists which contains
800 * Vars only from the given relation. For example, given ft1 JOIN t1 ON
801 * ft1.x + t1.x = 0, this function will say that the equivalence class
802 * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and
803 * t1 is local (or on a different server), it will turn out that no useful
804 * ORDER BY clause can be generated. It's not our job to figure that out
805 * here; we're only interested in identifying relevant ECs.
807 static List *
808 get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel)
810 List *useful_eclass_list = NIL;
811 ListCell *lc;
812 Relids relids;
815 * First, consider whether any active EC is potentially useful for a merge
816 * join against this relation.
818 if (rel->has_eclass_joins)
820 foreach(lc, root->eq_classes)
822 EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc);
824 if (eclass_useful_for_merging(root, cur_ec, rel))
825 useful_eclass_list = lappend(useful_eclass_list, cur_ec);
830 * Next, consider whether there are any non-EC derivable join clauses that
831 * are merge-joinable. If the joininfo list is empty, we can exit
832 * quickly.
834 if (rel->joininfo == NIL)
835 return useful_eclass_list;
837 /* If this is a child rel, we must use the topmost parent rel to search. */
838 if (IS_OTHER_REL(rel))
840 Assert(!bms_is_empty(rel->top_parent_relids));
841 relids = rel->top_parent_relids;
843 else
844 relids = rel->relids;
846 /* Check each join clause in turn. */
847 foreach(lc, rel->joininfo)
849 RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc);
851 /* Consider only mergejoinable clauses */
852 if (restrictinfo->mergeopfamilies == NIL)
853 continue;
855 /* Make sure we've got canonical ECs. */
856 update_mergeclause_eclasses(root, restrictinfo);
859 * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee
860 * that left_ec and right_ec will be initialized, per comments in
861 * distribute_qual_to_rels.
863 * We want to identify which side of this merge-joinable clause
864 * contains columns from the relation produced by this RelOptInfo. We
865 * test for overlap, not containment, because there could be extra
866 * relations on either side. For example, suppose we've got something
867 * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON
868 * A.y = D.y. The input rel might be the joinrel between A and B, and
869 * we'll consider the join clause A.y = D.y. relids contains a
870 * relation not involved in the join class (B) and the equivalence
871 * class for the left-hand side of the clause contains a relation not
872 * involved in the input rel (C). Despite the fact that we have only
873 * overlap and not containment in either direction, A.y is potentially
874 * useful as a sort column.
876 * Note that it's even possible that relids overlaps neither side of
877 * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x
878 * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list,
879 * but overlaps neither side of B. In that case, we just skip this
880 * join clause, since it doesn't suggest a useful sort order for this
881 * relation.
883 if (bms_overlap(relids, restrictinfo->right_ec->ec_relids))
884 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
885 restrictinfo->right_ec);
886 else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids))
887 useful_eclass_list = list_append_unique_ptr(useful_eclass_list,
888 restrictinfo->left_ec);
891 return useful_eclass_list;
895 * get_useful_pathkeys_for_relation
896 * Determine which orderings of a relation might be useful.
898 * Getting data in sorted order can be useful either because the requested
899 * order matches the final output ordering for the overall query we're
900 * planning, or because it enables an efficient merge join. Here, we try
901 * to figure out which pathkeys to consider.
903 static List *
904 get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
906 List *useful_pathkeys_list = NIL;
907 List *useful_eclass_list;
908 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
909 EquivalenceClass *query_ec = NULL;
910 ListCell *lc;
913 * Pushing the query_pathkeys to the remote server is always worth
914 * considering, because it might let us avoid a local sort.
916 fpinfo->qp_is_pushdown_safe = false;
917 if (root->query_pathkeys)
919 bool query_pathkeys_ok = true;
921 foreach(lc, root->query_pathkeys)
923 PathKey *pathkey = (PathKey *) lfirst(lc);
926 * The planner and executor don't have any clever strategy for
927 * taking data sorted by a prefix of the query's pathkeys and
928 * getting it to be sorted by all of those pathkeys. We'll just
929 * end up resorting the entire data set. So, unless we can push
930 * down all of the query pathkeys, forget it.
932 if (!is_foreign_pathkey(root, rel, pathkey))
934 query_pathkeys_ok = false;
935 break;
939 if (query_pathkeys_ok)
941 useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys));
942 fpinfo->qp_is_pushdown_safe = true;
947 * Even if we're not using remote estimates, having the remote side do the
948 * sort generally won't be any worse than doing it locally, and it might
949 * be much better if the remote side can generate data in the right order
950 * without needing a sort at all. However, what we're going to do next is
951 * try to generate pathkeys that seem promising for possible merge joins,
952 * and that's more speculative. A wrong choice might hurt quite a bit, so
953 * bail out if we can't use remote estimates.
955 if (!fpinfo->use_remote_estimate)
956 return useful_pathkeys_list;
958 /* Get the list of interesting EquivalenceClasses. */
959 useful_eclass_list = get_useful_ecs_for_relation(root, rel);
961 /* Extract unique EC for query, if any, so we don't consider it again. */
962 if (list_length(root->query_pathkeys) == 1)
964 PathKey *query_pathkey = linitial(root->query_pathkeys);
966 query_ec = query_pathkey->pk_eclass;
970 * As a heuristic, the only pathkeys we consider here are those of length
971 * one. It's surely possible to consider more, but since each one we
972 * choose to consider will generate a round-trip to the remote side, we
973 * need to be a bit cautious here. It would sure be nice to have a local
974 * cache of information about remote index definitions...
976 foreach(lc, useful_eclass_list)
978 EquivalenceClass *cur_ec = lfirst(lc);
979 PathKey *pathkey;
981 /* If redundant with what we did above, skip it. */
982 if (cur_ec == query_ec)
983 continue;
985 /* Can't push down the sort if the EC's opfamily is not shippable. */
986 if (!is_shippable(linitial_oid(cur_ec->ec_opfamilies),
987 OperatorFamilyRelationId, fpinfo))
988 continue;
990 /* If no pushable expression for this rel, skip it. */
991 if (find_em_for_rel(root, cur_ec, rel) == NULL)
992 continue;
994 /* Looks like we can generate a pathkey, so let's do it. */
995 pathkey = make_canonical_pathkey(root, cur_ec,
996 linitial_oid(cur_ec->ec_opfamilies),
997 BTLessStrategyNumber,
998 false);
999 useful_pathkeys_list = lappend(useful_pathkeys_list,
1000 list_make1(pathkey));
1003 return useful_pathkeys_list;
1007 * postgresGetForeignPaths
1008 * Create possible scan paths for a scan on the foreign table
1010 static void
1011 postgresGetForeignPaths(PlannerInfo *root,
1012 RelOptInfo *baserel,
1013 Oid foreigntableid)
1015 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
1016 ForeignPath *path;
1017 List *ppi_list;
1018 ListCell *lc;
1021 * Create simplest ForeignScan path node and add it to baserel. This path
1022 * corresponds to SeqScan path of regular tables (though depending on what
1023 * baserestrict conditions we were able to send to remote, there might
1024 * actually be an indexscan happening there). We already did all the work
1025 * to estimate cost and size of this path.
1027 * Although this path uses no join clauses, it could still have required
1028 * parameterization due to LATERAL refs in its tlist.
1030 path = create_foreignscan_path(root, baserel,
1031 NULL, /* default pathtarget */
1032 fpinfo->rows,
1033 fpinfo->startup_cost,
1034 fpinfo->total_cost,
1035 NIL, /* no pathkeys */
1036 baserel->lateral_relids,
1037 NULL, /* no extra plan */
1038 NIL, /* no fdw_restrictinfo list */
1039 NIL); /* no fdw_private list */
1040 add_path(baserel, (Path *) path);
1042 /* Add paths with pathkeys */
1043 add_paths_with_pathkeys_for_rel(root, baserel, NULL, NIL);
1046 * If we're not using remote estimates, stop here. We have no way to
1047 * estimate whether any join clauses would be worth sending across, so
1048 * don't bother building parameterized paths.
1050 if (!fpinfo->use_remote_estimate)
1051 return;
1054 * Thumb through all join clauses for the rel to identify which outer
1055 * relations could supply one or more safe-to-send-to-remote join clauses.
1056 * We'll build a parameterized path for each such outer relation.
1058 * It's convenient to manage this by representing each candidate outer
1059 * relation by the ParamPathInfo node for it. We can then use the
1060 * ppi_clauses list in the ParamPathInfo node directly as a list of the
1061 * interesting join clauses for that rel. This takes care of the
1062 * possibility that there are multiple safe join clauses for such a rel,
1063 * and also ensures that we account for unsafe join clauses that we'll
1064 * still have to enforce locally (since the parameterized-path machinery
1065 * insists that we handle all movable clauses).
1067 ppi_list = NIL;
1068 foreach(lc, baserel->joininfo)
1070 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1071 Relids required_outer;
1072 ParamPathInfo *param_info;
1074 /* Check if clause can be moved to this rel */
1075 if (!join_clause_is_movable_to(rinfo, baserel))
1076 continue;
1078 /* See if it is safe to send to remote */
1079 if (!is_foreign_expr(root, baserel, rinfo->clause))
1080 continue;
1082 /* Calculate required outer rels for the resulting path */
1083 required_outer = bms_union(rinfo->clause_relids,
1084 baserel->lateral_relids);
1085 /* We do not want the foreign rel itself listed in required_outer */
1086 required_outer = bms_del_member(required_outer, baserel->relid);
1089 * required_outer probably can't be empty here, but if it were, we
1090 * couldn't make a parameterized path.
1092 if (bms_is_empty(required_outer))
1093 continue;
1095 /* Get the ParamPathInfo */
1096 param_info = get_baserel_parampathinfo(root, baserel,
1097 required_outer);
1098 Assert(param_info != NULL);
1101 * Add it to list unless we already have it. Testing pointer equality
1102 * is OK since get_baserel_parampathinfo won't make duplicates.
1104 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1108 * The above scan examined only "generic" join clauses, not those that
1109 * were absorbed into EquivalenceClauses. See if we can make anything out
1110 * of EquivalenceClauses.
1112 if (baserel->has_eclass_joins)
1115 * We repeatedly scan the eclass list looking for column references
1116 * (or expressions) belonging to the foreign rel. Each time we find
1117 * one, we generate a list of equivalence joinclauses for it, and then
1118 * see if any are safe to send to the remote. Repeat till there are
1119 * no more candidate EC members.
1121 ec_member_foreign_arg arg;
1123 arg.already_used = NIL;
1124 for (;;)
1126 List *clauses;
1128 /* Make clauses, skipping any that join to lateral_referencers */
1129 arg.current = NULL;
1130 clauses = generate_implied_equalities_for_column(root,
1131 baserel,
1132 ec_member_matches_foreign,
1133 (void *) &arg,
1134 baserel->lateral_referencers);
1136 /* Done if there are no more expressions in the foreign rel */
1137 if (arg.current == NULL)
1139 Assert(clauses == NIL);
1140 break;
1143 /* Scan the extracted join clauses */
1144 foreach(lc, clauses)
1146 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
1147 Relids required_outer;
1148 ParamPathInfo *param_info;
1150 /* Check if clause can be moved to this rel */
1151 if (!join_clause_is_movable_to(rinfo, baserel))
1152 continue;
1154 /* See if it is safe to send to remote */
1155 if (!is_foreign_expr(root, baserel, rinfo->clause))
1156 continue;
1158 /* Calculate required outer rels for the resulting path */
1159 required_outer = bms_union(rinfo->clause_relids,
1160 baserel->lateral_relids);
1161 required_outer = bms_del_member(required_outer, baserel->relid);
1162 if (bms_is_empty(required_outer))
1163 continue;
1165 /* Get the ParamPathInfo */
1166 param_info = get_baserel_parampathinfo(root, baserel,
1167 required_outer);
1168 Assert(param_info != NULL);
1170 /* Add it to list unless we already have it */
1171 ppi_list = list_append_unique_ptr(ppi_list, param_info);
1174 /* Try again, now ignoring the expression we found this time */
1175 arg.already_used = lappend(arg.already_used, arg.current);
1180 * Now build a path for each useful outer relation.
1182 foreach(lc, ppi_list)
1184 ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
1185 double rows;
1186 int width;
1187 Cost startup_cost;
1188 Cost total_cost;
1190 /* Get a cost estimate from the remote */
1191 estimate_path_cost_size(root, baserel,
1192 param_info->ppi_clauses, NIL, NULL,
1193 &rows, &width,
1194 &startup_cost, &total_cost);
1197 * ppi_rows currently won't get looked at by anything, but still we
1198 * may as well ensure that it matches our idea of the rowcount.
1200 param_info->ppi_rows = rows;
1202 /* Make the path */
1203 path = create_foreignscan_path(root, baserel,
1204 NULL, /* default pathtarget */
1205 rows,
1206 startup_cost,
1207 total_cost,
1208 NIL, /* no pathkeys */
1209 param_info->ppi_req_outer,
1210 NULL,
1211 NIL, /* no fdw_restrictinfo list */
1212 NIL); /* no fdw_private list */
1213 add_path(baserel, (Path *) path);
1218 * postgresGetForeignPlan
1219 * Create ForeignScan plan node which implements selected best path
1221 static ForeignScan *
1222 postgresGetForeignPlan(PlannerInfo *root,
1223 RelOptInfo *foreignrel,
1224 Oid foreigntableid,
1225 ForeignPath *best_path,
1226 List *tlist,
1227 List *scan_clauses,
1228 Plan *outer_plan)
1230 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
1231 Index scan_relid;
1232 List *fdw_private;
1233 List *remote_exprs = NIL;
1234 List *local_exprs = NIL;
1235 List *params_list = NIL;
1236 List *fdw_scan_tlist = NIL;
1237 List *fdw_recheck_quals = NIL;
1238 List *retrieved_attrs;
1239 StringInfoData sql;
1240 bool has_final_sort = false;
1241 bool has_limit = false;
1242 ListCell *lc;
1245 * Get FDW private data created by postgresGetForeignUpperPaths(), if any.
1247 if (best_path->fdw_private)
1249 has_final_sort = boolVal(list_nth(best_path->fdw_private,
1250 FdwPathPrivateHasFinalSort));
1251 has_limit = boolVal(list_nth(best_path->fdw_private,
1252 FdwPathPrivateHasLimit));
1255 if (IS_SIMPLE_REL(foreignrel))
1258 * For base relations, set scan_relid as the relid of the relation.
1260 scan_relid = foreignrel->relid;
1263 * In a base-relation scan, we must apply the given scan_clauses.
1265 * Separate the scan_clauses into those that can be executed remotely
1266 * and those that can't. baserestrictinfo clauses that were
1267 * previously determined to be safe or unsafe by classifyConditions
1268 * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything
1269 * else in the scan_clauses list will be a join clause, which we have
1270 * to check for remote-safety.
1272 * Note: the join clauses we see here should be the exact same ones
1273 * previously examined by postgresGetForeignPaths. Possibly it'd be
1274 * worth passing forward the classification work done then, rather
1275 * than repeating it here.
1277 * This code must match "extract_actual_clauses(scan_clauses, false)"
1278 * except for the additional decision about remote versus local
1279 * execution.
1281 foreach(lc, scan_clauses)
1283 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
1285 /* Ignore any pseudoconstants, they're dealt with elsewhere */
1286 if (rinfo->pseudoconstant)
1287 continue;
1289 if (list_member_ptr(fpinfo->remote_conds, rinfo))
1290 remote_exprs = lappend(remote_exprs, rinfo->clause);
1291 else if (list_member_ptr(fpinfo->local_conds, rinfo))
1292 local_exprs = lappend(local_exprs, rinfo->clause);
1293 else if (is_foreign_expr(root, foreignrel, rinfo->clause))
1294 remote_exprs = lappend(remote_exprs, rinfo->clause);
1295 else
1296 local_exprs = lappend(local_exprs, rinfo->clause);
1300 * For a base-relation scan, we have to support EPQ recheck, which
1301 * should recheck all the remote quals.
1303 fdw_recheck_quals = remote_exprs;
1305 else
1308 * Join relation or upper relation - set scan_relid to 0.
1310 scan_relid = 0;
1313 * For a join rel, baserestrictinfo is NIL and we are not considering
1314 * parameterization right now, so there should be no scan_clauses for
1315 * a joinrel or an upper rel either.
1317 Assert(!scan_clauses);
1320 * Instead we get the conditions to apply from the fdw_private
1321 * structure.
1323 remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false);
1324 local_exprs = extract_actual_clauses(fpinfo->local_conds, false);
1327 * We leave fdw_recheck_quals empty in this case, since we never need
1328 * to apply EPQ recheck clauses. In the case of a joinrel, EPQ
1329 * recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
1330 * If we're planning an upperrel (ie, remote grouping or aggregation)
1331 * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
1332 * allowed, and indeed we *can't* put the remote clauses into
1333 * fdw_recheck_quals because the unaggregated Vars won't be available
1334 * locally.
1337 /* Build the list of columns to be fetched from the foreign server. */
1338 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
1341 * Ensure that the outer plan produces a tuple whose descriptor
1342 * matches our scan tuple slot. Also, remove the local conditions
1343 * from outer plan's quals, lest they be evaluated twice, once by the
1344 * local plan and once by the scan.
1346 if (outer_plan)
1349 * Right now, we only consider grouping and aggregation beyond
1350 * joins. Queries involving aggregates or grouping do not require
1351 * EPQ mechanism, hence should not have an outer plan here.
1353 Assert(!IS_UPPER_REL(foreignrel));
1356 * First, update the plan's qual list if possible. In some cases
1357 * the quals might be enforced below the topmost plan level, in
1358 * which case we'll fail to remove them; it's not worth working
1359 * harder than this.
1361 foreach(lc, local_exprs)
1363 Node *qual = lfirst(lc);
1365 outer_plan->qual = list_delete(outer_plan->qual, qual);
1368 * For an inner join the local conditions of foreign scan plan
1369 * can be part of the joinquals as well. (They might also be
1370 * in the mergequals or hashquals, but we can't touch those
1371 * without breaking the plan.)
1373 if (IsA(outer_plan, NestLoop) ||
1374 IsA(outer_plan, MergeJoin) ||
1375 IsA(outer_plan, HashJoin))
1377 Join *join_plan = (Join *) outer_plan;
1379 if (join_plan->jointype == JOIN_INNER)
1380 join_plan->joinqual = list_delete(join_plan->joinqual,
1381 qual);
1386 * Now fix the subplan's tlist --- this might result in inserting
1387 * a Result node atop the plan tree.
1389 outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist,
1390 best_path->path.parallel_safe);
1395 * Build the query string to be sent for execution, and identify
1396 * expressions to be sent as parameters.
1398 initStringInfo(&sql);
1399 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
1400 remote_exprs, best_path->path.pathkeys,
1401 has_final_sort, has_limit, false,
1402 &retrieved_attrs, &params_list);
1404 /* Remember remote_exprs for possible use by postgresPlanDirectModify */
1405 fpinfo->final_remote_exprs = remote_exprs;
1408 * Build the fdw_private list that will be available to the executor.
1409 * Items in the list must match order in enum FdwScanPrivateIndex.
1411 fdw_private = list_make3(makeString(sql.data),
1412 retrieved_attrs,
1413 makeInteger(fpinfo->fetch_size));
1414 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
1415 fdw_private = lappend(fdw_private,
1416 makeString(fpinfo->relation_name));
1419 * Create the ForeignScan node for the given relation.
1421 * Note that the remote parameter expressions are stored in the fdw_exprs
1422 * field of the finished plan node; we can't keep them in private state
1423 * because then they wouldn't be subject to later planner processing.
1425 return make_foreignscan(tlist,
1426 local_exprs,
1427 scan_relid,
1428 params_list,
1429 fdw_private,
1430 fdw_scan_tlist,
1431 fdw_recheck_quals,
1432 outer_plan);
1436 * Construct a tuple descriptor for the scan tuples handled by a foreign join.
1438 static TupleDesc
1439 get_tupdesc_for_join_scan_tuples(ForeignScanState *node)
1441 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1442 EState *estate = node->ss.ps.state;
1443 TupleDesc tupdesc;
1446 * The core code has already set up a scan tuple slot based on
1447 * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough,
1448 * but there's one case where it isn't. If we have any whole-row row
1449 * identifier Vars, they may have vartype RECORD, and we need to replace
1450 * that with the associated table's actual composite type. This ensures
1451 * that when we read those ROW() expression values from the remote server,
1452 * we can convert them to a composite type the local server knows.
1454 tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1455 for (int i = 0; i < tupdesc->natts; i++)
1457 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
1458 Var *var;
1459 RangeTblEntry *rte;
1460 Oid reltype;
1462 /* Nothing to do if it's not a generic RECORD attribute */
1463 if (att->atttypid != RECORDOID || att->atttypmod >= 0)
1464 continue;
1467 * If we can't identify the referenced table, do nothing. This'll
1468 * likely lead to failure later, but perhaps we can muddle through.
1470 var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
1471 i)->expr;
1472 if (!IsA(var, Var) || var->varattno != 0)
1473 continue;
1474 rte = list_nth(estate->es_range_table, var->varno - 1);
1475 if (rte->rtekind != RTE_RELATION)
1476 continue;
1477 reltype = get_rel_type_id(rte->relid);
1478 if (!OidIsValid(reltype))
1479 continue;
1480 att->atttypid = reltype;
1481 /* shouldn't need to change anything else */
1483 return tupdesc;
1487 * postgresBeginForeignScan
1488 * Initiate an executor scan of a foreign PostgreSQL table.
1490 static void
1491 postgresBeginForeignScan(ForeignScanState *node, int eflags)
1493 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
1494 EState *estate = node->ss.ps.state;
1495 PgFdwScanState *fsstate;
1496 RangeTblEntry *rte;
1497 Oid userid;
1498 ForeignTable *table;
1499 UserMapping *user;
1500 int rtindex;
1501 int numParams;
1504 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
1506 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1507 return;
1510 * We'll save private state in node->fdw_state.
1512 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
1513 node->fdw_state = (void *) fsstate;
1516 * Identify which user to do the remote access as. This should match what
1517 * ExecCheckPermissions() does.
1519 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
1520 if (fsplan->scan.scanrelid > 0)
1521 rtindex = fsplan->scan.scanrelid;
1522 else
1523 rtindex = bms_next_member(fsplan->fs_base_relids, -1);
1524 rte = exec_rt_fetch(rtindex, estate);
1526 /* Get info about foreign table. */
1527 table = GetForeignTable(rte->relid);
1528 user = GetUserMapping(userid, table->serverid);
1531 * Get connection to the foreign server. Connection manager will
1532 * establish new connection if necessary.
1534 fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
1536 /* Assign a unique ID for my cursor */
1537 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
1538 fsstate->cursor_exists = false;
1540 /* Get private info created by planner functions. */
1541 fsstate->query = strVal(list_nth(fsplan->fdw_private,
1542 FdwScanPrivateSelectSql));
1543 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
1544 FdwScanPrivateRetrievedAttrs);
1545 fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
1546 FdwScanPrivateFetchSize));
1548 /* Create contexts for batches of tuples and per-tuple temp workspace. */
1549 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
1550 "postgres_fdw tuple data",
1551 ALLOCSET_DEFAULT_SIZES);
1552 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1553 "postgres_fdw temporary data",
1554 ALLOCSET_SMALL_SIZES);
1557 * Get info we'll need for converting data fetched from the foreign server
1558 * into local representation and error reporting during that process.
1560 if (fsplan->scan.scanrelid > 0)
1562 fsstate->rel = node->ss.ss_currentRelation;
1563 fsstate->tupdesc = RelationGetDescr(fsstate->rel);
1565 else
1567 fsstate->rel = NULL;
1568 fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
1571 fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
1574 * Prepare for processing of parameters used in remote query, if any.
1576 numParams = list_length(fsplan->fdw_exprs);
1577 fsstate->numParams = numParams;
1578 if (numParams > 0)
1579 prepare_query_params((PlanState *) node,
1580 fsplan->fdw_exprs,
1581 numParams,
1582 &fsstate->param_flinfo,
1583 &fsstate->param_exprs,
1584 &fsstate->param_values);
1586 /* Set the async-capable flag */
1587 fsstate->async_capable = node->ss.ps.async_capable;
1591 * postgresIterateForeignScan
1592 * Retrieve next row from the result set, or clear tuple slot to indicate
1593 * EOF.
1595 static TupleTableSlot *
1596 postgresIterateForeignScan(ForeignScanState *node)
1598 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1599 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
1602 * In sync mode, if this is the first call after Begin or ReScan, we need
1603 * to create the cursor on the remote side. In async mode, we would have
1604 * already created the cursor before we get here, even if this is the
1605 * first call after Begin or ReScan.
1607 if (!fsstate->cursor_exists)
1608 create_cursor(node);
1611 * Get some more tuples, if we've run out.
1613 if (fsstate->next_tuple >= fsstate->num_tuples)
1615 /* In async mode, just clear tuple slot. */
1616 if (fsstate->async_capable)
1617 return ExecClearTuple(slot);
1618 /* No point in another fetch if we already detected EOF, though. */
1619 if (!fsstate->eof_reached)
1620 fetch_more_data(node);
1621 /* If we didn't get any tuples, must be end of data. */
1622 if (fsstate->next_tuple >= fsstate->num_tuples)
1623 return ExecClearTuple(slot);
1627 * Return the next tuple.
1629 ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
1630 slot,
1631 false);
1633 return slot;
1637 * postgresReScanForeignScan
1638 * Restart the scan.
1640 static void
1641 postgresReScanForeignScan(ForeignScanState *node)
1643 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1644 char sql[64];
1645 PGresult *res;
1647 /* If we haven't created the cursor yet, nothing to do. */
1648 if (!fsstate->cursor_exists)
1649 return;
1652 * If the node is async-capable, and an asynchronous fetch for it has
1653 * begun, the asynchronous fetch might not have yet completed. Check if
1654 * the node is async-capable, and an asynchronous fetch for it is still in
1655 * progress; if so, complete the asynchronous fetch before restarting the
1656 * scan.
1658 if (fsstate->async_capable &&
1659 fsstate->conn_state->pendingAreq &&
1660 fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
1661 fetch_more_data(node);
1664 * If any internal parameters affecting this node have changed, we'd
1665 * better destroy and recreate the cursor. Otherwise, if the remote
1666 * server is v14 or older, rewinding it should be good enough; if not,
1667 * rewind is only allowed for scrollable cursors, but we don't have a way
1668 * to check the scrollability of it, so destroy and recreate it in any
1669 * case. If we've only fetched zero or one batch, we needn't even rewind
1670 * the cursor, just rescan what we have.
1672 if (node->ss.ps.chgParam != NULL)
1674 fsstate->cursor_exists = false;
1675 snprintf(sql, sizeof(sql), "CLOSE c%u",
1676 fsstate->cursor_number);
1678 else if (fsstate->fetch_ct_2 > 1)
1680 if (PQserverVersion(fsstate->conn) < 150000)
1681 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
1682 fsstate->cursor_number);
1683 else
1685 fsstate->cursor_exists = false;
1686 snprintf(sql, sizeof(sql), "CLOSE c%u",
1687 fsstate->cursor_number);
1690 else
1692 /* Easy: just rescan what we already have in memory, if anything */
1693 fsstate->next_tuple = 0;
1694 return;
1698 * We don't use a PG_TRY block here, so be careful not to throw error
1699 * without releasing the PGresult.
1701 res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
1702 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1703 pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
1704 PQclear(res);
1706 /* Now force a fresh FETCH. */
1707 fsstate->tuples = NULL;
1708 fsstate->num_tuples = 0;
1709 fsstate->next_tuple = 0;
1710 fsstate->fetch_ct_2 = 0;
1711 fsstate->eof_reached = false;
1715 * postgresEndForeignScan
1716 * Finish scanning foreign table and dispose objects used for this scan
1718 static void
1719 postgresEndForeignScan(ForeignScanState *node)
1721 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
1723 /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
1724 if (fsstate == NULL)
1725 return;
1727 /* Close the cursor if open, to prevent accumulation of cursors */
1728 if (fsstate->cursor_exists)
1729 close_cursor(fsstate->conn, fsstate->cursor_number,
1730 fsstate->conn_state);
1732 /* Release remote connection */
1733 ReleaseConnection(fsstate->conn);
1734 fsstate->conn = NULL;
1736 /* MemoryContexts will be deleted automatically. */
1740 * postgresAddForeignUpdateTargets
1741 * Add resjunk column(s) needed for update/delete on a foreign table
1743 static void
1744 postgresAddForeignUpdateTargets(PlannerInfo *root,
1745 Index rtindex,
1746 RangeTblEntry *target_rte,
1747 Relation target_relation)
1749 Var *var;
1752 * In postgres_fdw, what we need is the ctid, same as for a regular table.
1755 /* Make a Var representing the desired value */
1756 var = makeVar(rtindex,
1757 SelfItemPointerAttributeNumber,
1758 TIDOID,
1760 InvalidOid,
1763 /* Register it as a row-identity column needed by this target rel */
1764 add_row_identity_var(root, var, rtindex, "ctid");
1768 * postgresPlanForeignModify
1769 * Plan an insert/update/delete operation on a foreign table
1771 static List *
1772 postgresPlanForeignModify(PlannerInfo *root,
1773 ModifyTable *plan,
1774 Index resultRelation,
1775 int subplan_index)
1777 CmdType operation = plan->operation;
1778 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
1779 Relation rel;
1780 StringInfoData sql;
1781 List *targetAttrs = NIL;
1782 List *withCheckOptionList = NIL;
1783 List *returningList = NIL;
1784 List *retrieved_attrs = NIL;
1785 bool doNothing = false;
1786 int values_end_len = -1;
1788 initStringInfo(&sql);
1791 * Core code already has some lock on each rel being planned, so we can
1792 * use NoLock here.
1794 rel = table_open(rte->relid, NoLock);
1797 * In an INSERT, we transmit all columns that are defined in the foreign
1798 * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the
1799 * foreign table, we transmit all columns like INSERT; else we transmit
1800 * only columns that were explicitly targets of the UPDATE, so as to avoid
1801 * unnecessary data transmission. (We can't do that for INSERT since we
1802 * would miss sending default values for columns not listed in the source
1803 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since
1804 * those triggers might change values for non-target columns, in which
1805 * case we would miss sending changed values for those columns.)
1807 if (operation == CMD_INSERT ||
1808 (operation == CMD_UPDATE &&
1809 rel->trigdesc &&
1810 rel->trigdesc->trig_update_before_row))
1812 TupleDesc tupdesc = RelationGetDescr(rel);
1813 int attnum;
1815 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
1817 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1819 if (!attr->attisdropped)
1820 targetAttrs = lappend_int(targetAttrs, attnum);
1823 else if (operation == CMD_UPDATE)
1825 int col;
1826 RelOptInfo *rel = find_base_rel(root, resultRelation);
1827 Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel);
1829 col = -1;
1830 while ((col = bms_next_member(allUpdatedCols, col)) >= 0)
1832 /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
1833 AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
1835 if (attno <= InvalidAttrNumber) /* shouldn't happen */
1836 elog(ERROR, "system-column update is not supported");
1837 targetAttrs = lappend_int(targetAttrs, attno);
1842 * Extract the relevant WITH CHECK OPTION list if any.
1844 if (plan->withCheckOptionLists)
1845 withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists,
1846 subplan_index);
1849 * Extract the relevant RETURNING list if any.
1851 if (plan->returningLists)
1852 returningList = (List *) list_nth(plan->returningLists, subplan_index);
1855 * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification
1856 * should have already been rejected in the optimizer, as presently there
1857 * is no way to recognize an arbiter index on a foreign table. Only DO
1858 * NOTHING is supported without an inference specification.
1860 if (plan->onConflictAction == ONCONFLICT_NOTHING)
1861 doNothing = true;
1862 else if (plan->onConflictAction != ONCONFLICT_NONE)
1863 elog(ERROR, "unexpected ON CONFLICT specification: %d",
1864 (int) plan->onConflictAction);
1867 * Construct the SQL command string.
1869 switch (operation)
1871 case CMD_INSERT:
1872 deparseInsertSql(&sql, rte, resultRelation, rel,
1873 targetAttrs, doNothing,
1874 withCheckOptionList, returningList,
1875 &retrieved_attrs, &values_end_len);
1876 break;
1877 case CMD_UPDATE:
1878 deparseUpdateSql(&sql, rte, resultRelation, rel,
1879 targetAttrs,
1880 withCheckOptionList, returningList,
1881 &retrieved_attrs);
1882 break;
1883 case CMD_DELETE:
1884 deparseDeleteSql(&sql, rte, resultRelation, rel,
1885 returningList,
1886 &retrieved_attrs);
1887 break;
1888 default:
1889 elog(ERROR, "unexpected operation: %d", (int) operation);
1890 break;
1893 table_close(rel, NoLock);
1896 * Build the fdw_private list that will be available to the executor.
1897 * Items in the list must match enum FdwModifyPrivateIndex, above.
1899 return list_make5(makeString(sql.data),
1900 targetAttrs,
1901 makeInteger(values_end_len),
1902 makeBoolean((retrieved_attrs != NIL)),
1903 retrieved_attrs);
1907 * postgresBeginForeignModify
1908 * Begin an insert/update/delete operation on a foreign table
1910 static void
1911 postgresBeginForeignModify(ModifyTableState *mtstate,
1912 ResultRelInfo *resultRelInfo,
1913 List *fdw_private,
1914 int subplan_index,
1915 int eflags)
1917 PgFdwModifyState *fmstate;
1918 char *query;
1919 List *target_attrs;
1920 bool has_returning;
1921 int values_end_len;
1922 List *retrieved_attrs;
1923 RangeTblEntry *rte;
1926 * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
1927 * stays NULL.
1929 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
1930 return;
1932 /* Deconstruct fdw_private data. */
1933 query = strVal(list_nth(fdw_private,
1934 FdwModifyPrivateUpdateSql));
1935 target_attrs = (List *) list_nth(fdw_private,
1936 FdwModifyPrivateTargetAttnums);
1937 values_end_len = intVal(list_nth(fdw_private,
1938 FdwModifyPrivateLen));
1939 has_returning = boolVal(list_nth(fdw_private,
1940 FdwModifyPrivateHasReturning));
1941 retrieved_attrs = (List *) list_nth(fdw_private,
1942 FdwModifyPrivateRetrievedAttrs);
1944 /* Find RTE. */
1945 rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex,
1946 mtstate->ps.state);
1948 /* Construct an execution state. */
1949 fmstate = create_foreign_modify(mtstate->ps.state,
1950 rte,
1951 resultRelInfo,
1952 mtstate->operation,
1953 outerPlanState(mtstate)->plan,
1954 query,
1955 target_attrs,
1956 values_end_len,
1957 has_returning,
1958 retrieved_attrs);
1960 resultRelInfo->ri_FdwState = fmstate;
1964 * postgresExecForeignInsert
1965 * Insert one row into a foreign table
1967 static TupleTableSlot *
1968 postgresExecForeignInsert(EState *estate,
1969 ResultRelInfo *resultRelInfo,
1970 TupleTableSlot *slot,
1971 TupleTableSlot *planSlot)
1973 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
1974 TupleTableSlot **rslot;
1975 int numSlots = 1;
1978 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
1979 * postgresBeginForeignInsert())
1981 if (fmstate->aux_fmstate)
1982 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
1983 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
1984 &slot, &planSlot, &numSlots);
1985 /* Revert that change */
1986 if (fmstate->aux_fmstate)
1987 resultRelInfo->ri_FdwState = fmstate;
1989 return rslot ? *rslot : NULL;
1993 * postgresExecForeignBatchInsert
1994 * Insert multiple rows into a foreign table
1996 static TupleTableSlot **
1997 postgresExecForeignBatchInsert(EState *estate,
1998 ResultRelInfo *resultRelInfo,
1999 TupleTableSlot **slots,
2000 TupleTableSlot **planSlots,
2001 int *numSlots)
2003 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2004 TupleTableSlot **rslot;
2007 * If the fmstate has aux_fmstate set, use the aux_fmstate (see
2008 * postgresBeginForeignInsert())
2010 if (fmstate->aux_fmstate)
2011 resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
2012 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
2013 slots, planSlots, numSlots);
2014 /* Revert that change */
2015 if (fmstate->aux_fmstate)
2016 resultRelInfo->ri_FdwState = fmstate;
2018 return rslot;
2022 * postgresGetForeignModifyBatchSize
2023 * Determine the maximum number of tuples that can be inserted in bulk
2025 * Returns the batch size specified for server or table. When batching is not
2026 * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING
2027 * clause), returns 1.
2029 static int
2030 postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
2032 int batch_size;
2033 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2035 /* should be called only once */
2036 Assert(resultRelInfo->ri_BatchSize == 0);
2039 * Should never get called when the insert is being performed on a table
2040 * that is also among the target relations of an UPDATE operation, because
2041 * postgresBeginForeignInsert() currently rejects such insert attempts.
2043 Assert(fmstate == NULL || fmstate->aux_fmstate == NULL);
2046 * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup
2047 * the option directly in server/table options. Otherwise just use the
2048 * value we determined earlier.
2050 if (fmstate)
2051 batch_size = fmstate->batch_size;
2052 else
2053 batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
2056 * Disable batching when we have to use RETURNING, there are any
2057 * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any
2058 * WITH CHECK OPTION constraints from parent views.
2060 * When there are any BEFORE ROW INSERT triggers on the table, we can't
2061 * support it, because such triggers might query the table we're inserting
2062 * into and act differently if the tuples that have already been processed
2063 * and prepared for insertion are not there.
2065 if (resultRelInfo->ri_projectReturning != NULL ||
2066 resultRelInfo->ri_WithCheckOptions != NIL ||
2067 (resultRelInfo->ri_TrigDesc &&
2068 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2069 resultRelInfo->ri_TrigDesc->trig_insert_after_row)))
2070 return 1;
2073 * If the foreign table has no columns, disable batching as the INSERT
2074 * syntax doesn't allow batching multiple empty rows into a zero-column
2075 * table in a single statement. This is needed for COPY FROM, in which
2076 * case fmstate must be non-NULL.
2078 if (fmstate && list_length(fmstate->target_attrs) == 0)
2079 return 1;
2082 * Otherwise use the batch size specified for server/table. The number of
2083 * parameters in a batch is limited to 65535 (uint16), so make sure we
2084 * don't exceed this limit by using the maximum batch_size possible.
2086 if (fmstate && fmstate->p_nums > 0)
2087 batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums);
2089 return batch_size;
2093 * postgresExecForeignUpdate
2094 * Update one row in a foreign table
2096 static TupleTableSlot *
2097 postgresExecForeignUpdate(EState *estate,
2098 ResultRelInfo *resultRelInfo,
2099 TupleTableSlot *slot,
2100 TupleTableSlot *planSlot)
2102 TupleTableSlot **rslot;
2103 int numSlots = 1;
2105 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
2106 &slot, &planSlot, &numSlots);
2108 return rslot ? rslot[0] : NULL;
2112 * postgresExecForeignDelete
2113 * Delete one row from a foreign table
2115 static TupleTableSlot *
2116 postgresExecForeignDelete(EState *estate,
2117 ResultRelInfo *resultRelInfo,
2118 TupleTableSlot *slot,
2119 TupleTableSlot *planSlot)
2121 TupleTableSlot **rslot;
2122 int numSlots = 1;
2124 rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
2125 &slot, &planSlot, &numSlots);
2127 return rslot ? rslot[0] : NULL;
2131 * postgresEndForeignModify
2132 * Finish an insert/update/delete operation on a foreign table
2134 static void
2135 postgresEndForeignModify(EState *estate,
2136 ResultRelInfo *resultRelInfo)
2138 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2140 /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
2141 if (fmstate == NULL)
2142 return;
2144 /* Destroy the execution state */
2145 finish_foreign_modify(fmstate);
2149 * postgresBeginForeignInsert
2150 * Begin an insert operation on a foreign table
2152 static void
2153 postgresBeginForeignInsert(ModifyTableState *mtstate,
2154 ResultRelInfo *resultRelInfo)
2156 PgFdwModifyState *fmstate;
2157 ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan);
2158 EState *estate = mtstate->ps.state;
2159 Index resultRelation;
2160 Relation rel = resultRelInfo->ri_RelationDesc;
2161 RangeTblEntry *rte;
2162 TupleDesc tupdesc = RelationGetDescr(rel);
2163 int attnum;
2164 int values_end_len;
2165 StringInfoData sql;
2166 List *targetAttrs = NIL;
2167 List *retrieved_attrs = NIL;
2168 bool doNothing = false;
2171 * If the foreign table we are about to insert routed rows into is also an
2172 * UPDATE subplan result rel that will be updated later, proceeding with
2173 * the INSERT will result in the later UPDATE incorrectly modifying those
2174 * routed rows, so prevent the INSERT --- it would be nice if we could
2175 * handle this case; but for now, throw an error for safety.
2177 if (plan && plan->operation == CMD_UPDATE &&
2178 (resultRelInfo->ri_usesFdwDirectModify ||
2179 resultRelInfo->ri_FdwState))
2180 ereport(ERROR,
2181 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2182 errmsg("cannot route tuples into foreign table to be updated \"%s\"",
2183 RelationGetRelationName(rel))));
2185 initStringInfo(&sql);
2187 /* We transmit all columns that are defined in the foreign table. */
2188 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
2190 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
2192 if (!attr->attisdropped)
2193 targetAttrs = lappend_int(targetAttrs, attnum);
2196 /* Check if we add the ON CONFLICT clause to the remote query. */
2197 if (plan)
2199 OnConflictAction onConflictAction = plan->onConflictAction;
2201 /* We only support DO NOTHING without an inference specification. */
2202 if (onConflictAction == ONCONFLICT_NOTHING)
2203 doNothing = true;
2204 else if (onConflictAction != ONCONFLICT_NONE)
2205 elog(ERROR, "unexpected ON CONFLICT specification: %d",
2206 (int) onConflictAction);
2210 * If the foreign table is a partition that doesn't have a corresponding
2211 * RTE entry, we need to create a new RTE describing the foreign table for
2212 * use by deparseInsertSql and create_foreign_modify() below, after first
2213 * copying the parent's RTE and modifying some fields to describe the
2214 * foreign partition to work on. However, if this is invoked by UPDATE,
2215 * the existing RTE may already correspond to this partition if it is one
2216 * of the UPDATE subplan target rels; in that case, we can just use the
2217 * existing RTE as-is.
2219 if (resultRelInfo->ri_RangeTableIndex == 0)
2221 ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo;
2223 rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate);
2224 rte = copyObject(rte);
2225 rte->relid = RelationGetRelid(rel);
2226 rte->relkind = RELKIND_FOREIGN_TABLE;
2229 * For UPDATE, we must use the RT index of the first subplan target
2230 * rel's RTE, because the core code would have built expressions for
2231 * the partition, such as RETURNING, using that RT index as varno of
2232 * Vars contained in those expressions.
2234 if (plan && plan->operation == CMD_UPDATE &&
2235 rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation)
2236 resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex;
2237 else
2238 resultRelation = rootResultRelInfo->ri_RangeTableIndex;
2240 else
2242 resultRelation = resultRelInfo->ri_RangeTableIndex;
2243 rte = exec_rt_fetch(resultRelation, estate);
2246 /* Construct the SQL command string. */
2247 deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
2248 resultRelInfo->ri_WithCheckOptions,
2249 resultRelInfo->ri_returningList,
2250 &retrieved_attrs, &values_end_len);
2252 /* Construct an execution state. */
2253 fmstate = create_foreign_modify(mtstate->ps.state,
2254 rte,
2255 resultRelInfo,
2256 CMD_INSERT,
2257 NULL,
2258 sql.data,
2259 targetAttrs,
2260 values_end_len,
2261 retrieved_attrs != NIL,
2262 retrieved_attrs);
2265 * If the given resultRelInfo already has PgFdwModifyState set, it means
2266 * the foreign table is an UPDATE subplan result rel; in which case, store
2267 * the resulting state into the aux_fmstate of the PgFdwModifyState.
2269 if (resultRelInfo->ri_FdwState)
2271 Assert(plan && plan->operation == CMD_UPDATE);
2272 Assert(resultRelInfo->ri_usesFdwDirectModify == false);
2273 ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate;
2275 else
2276 resultRelInfo->ri_FdwState = fmstate;
2280 * postgresEndForeignInsert
2281 * Finish an insert operation on a foreign table
2283 static void
2284 postgresEndForeignInsert(EState *estate,
2285 ResultRelInfo *resultRelInfo)
2287 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
2289 Assert(fmstate != NULL);
2292 * If the fmstate has aux_fmstate set, get the aux_fmstate (see
2293 * postgresBeginForeignInsert())
2295 if (fmstate->aux_fmstate)
2296 fmstate = fmstate->aux_fmstate;
2298 /* Destroy the execution state */
2299 finish_foreign_modify(fmstate);
2303 * postgresIsForeignRelUpdatable
2304 * Determine whether a foreign table supports INSERT, UPDATE and/or
2305 * DELETE.
2307 static int
2308 postgresIsForeignRelUpdatable(Relation rel)
2310 bool updatable;
2311 ForeignTable *table;
2312 ForeignServer *server;
2313 ListCell *lc;
2316 * By default, all postgres_fdw foreign tables are assumed updatable. This
2317 * can be overridden by a per-server setting, which in turn can be
2318 * overridden by a per-table setting.
2320 updatable = true;
2322 table = GetForeignTable(RelationGetRelid(rel));
2323 server = GetForeignServer(table->serverid);
2325 foreach(lc, server->options)
2327 DefElem *def = (DefElem *) lfirst(lc);
2329 if (strcmp(def->defname, "updatable") == 0)
2330 updatable = defGetBoolean(def);
2332 foreach(lc, table->options)
2334 DefElem *def = (DefElem *) lfirst(lc);
2336 if (strcmp(def->defname, "updatable") == 0)
2337 updatable = defGetBoolean(def);
2341 * Currently "updatable" means support for INSERT, UPDATE and DELETE.
2343 return updatable ?
2344 (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0;
2348 * postgresRecheckForeignScan
2349 * Execute a local join execution plan for a foreign join
2351 static bool
2352 postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
2354 Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
2355 PlanState *outerPlan = outerPlanState(node);
2356 TupleTableSlot *result;
2358 /* For base foreign relations, it suffices to set fdw_recheck_quals */
2359 if (scanrelid > 0)
2360 return true;
2362 Assert(outerPlan != NULL);
2364 /* Execute a local join execution plan */
2365 result = ExecProcNode(outerPlan);
2366 if (TupIsNull(result))
2367 return false;
2369 /* Store result in the given slot */
2370 ExecCopySlot(slot, result);
2372 return true;
2376 * find_modifytable_subplan
2377 * Helper routine for postgresPlanDirectModify to find the
2378 * ModifyTable subplan node that scans the specified RTI.
2380 * Returns NULL if the subplan couldn't be identified. That's not a fatal
2381 * error condition, we just abandon trying to do the update directly.
2383 static ForeignScan *
2384 find_modifytable_subplan(PlannerInfo *root,
2385 ModifyTable *plan,
2386 Index rtindex,
2387 int subplan_index)
2389 Plan *subplan = outerPlan(plan);
2392 * The cases we support are (1) the desired ForeignScan is the immediate
2393 * child of ModifyTable, or (2) it is the subplan_index'th child of an
2394 * Append node that is the immediate child of ModifyTable. There is no
2395 * point in looking further down, as that would mean that local joins are
2396 * involved, so we can't do the update directly.
2398 * There could be a Result atop the Append too, acting to compute the
2399 * UPDATE targetlist values. We ignore that here; the tlist will be
2400 * checked by our caller.
2402 * In principle we could examine all the children of the Append, but it's
2403 * currently unlikely that the core planner would generate such a plan
2404 * with the children out-of-order. Moreover, such a search risks costing
2405 * O(N^2) time when there are a lot of children.
2407 if (IsA(subplan, Append))
2409 Append *appendplan = (Append *) subplan;
2411 if (subplan_index < list_length(appendplan->appendplans))
2412 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2414 else if (IsA(subplan, Result) &&
2415 outerPlan(subplan) != NULL &&
2416 IsA(outerPlan(subplan), Append))
2418 Append *appendplan = (Append *) outerPlan(subplan);
2420 if (subplan_index < list_length(appendplan->appendplans))
2421 subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index);
2424 /* Now, have we got a ForeignScan on the desired rel? */
2425 if (IsA(subplan, ForeignScan))
2427 ForeignScan *fscan = (ForeignScan *) subplan;
2429 if (bms_is_member(rtindex, fscan->fs_base_relids))
2430 return fscan;
2433 return NULL;
2437 * postgresPlanDirectModify
2438 * Consider a direct foreign table modification
2440 * Decide whether it is safe to modify a foreign table directly, and if so,
2441 * rewrite subplan accordingly.
2443 static bool
2444 postgresPlanDirectModify(PlannerInfo *root,
2445 ModifyTable *plan,
2446 Index resultRelation,
2447 int subplan_index)
2449 CmdType operation = plan->operation;
2450 RelOptInfo *foreignrel;
2451 RangeTblEntry *rte;
2452 PgFdwRelationInfo *fpinfo;
2453 Relation rel;
2454 StringInfoData sql;
2455 ForeignScan *fscan;
2456 List *processed_tlist = NIL;
2457 List *targetAttrs = NIL;
2458 List *remote_exprs;
2459 List *params_list = NIL;
2460 List *returningList = NIL;
2461 List *retrieved_attrs = NIL;
2464 * Decide whether it is safe to modify a foreign table directly.
2468 * The table modification must be an UPDATE or DELETE.
2470 if (operation != CMD_UPDATE && operation != CMD_DELETE)
2471 return false;
2474 * Try to locate the ForeignScan subplan that's scanning resultRelation.
2476 fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index);
2477 if (!fscan)
2478 return false;
2481 * It's unsafe to modify a foreign table directly if there are any quals
2482 * that should be evaluated locally.
2484 if (fscan->scan.plan.qual != NIL)
2485 return false;
2487 /* Safe to fetch data about the target foreign rel */
2488 if (fscan->scan.scanrelid == 0)
2490 foreignrel = find_join_rel(root, fscan->fs_relids);
2491 /* We should have a rel for this foreign join. */
2492 Assert(foreignrel);
2494 else
2495 foreignrel = root->simple_rel_array[resultRelation];
2496 rte = root->simple_rte_array[resultRelation];
2497 fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
2500 * It's unsafe to update a foreign table directly, if any expressions to
2501 * assign to the target columns are unsafe to evaluate remotely.
2503 if (operation == CMD_UPDATE)
2505 ListCell *lc,
2506 *lc2;
2509 * The expressions of concern are the first N columns of the processed
2510 * targetlist, where N is the length of the rel's update_colnos.
2512 get_translated_update_targetlist(root, resultRelation,
2513 &processed_tlist, &targetAttrs);
2514 forboth(lc, processed_tlist, lc2, targetAttrs)
2516 TargetEntry *tle = lfirst_node(TargetEntry, lc);
2517 AttrNumber attno = lfirst_int(lc2);
2519 /* update's new-value expressions shouldn't be resjunk */
2520 Assert(!tle->resjunk);
2522 if (attno <= InvalidAttrNumber) /* shouldn't happen */
2523 elog(ERROR, "system-column update is not supported");
2525 if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr))
2526 return false;
2531 * Ok, rewrite subplan so as to modify the foreign table directly.
2533 initStringInfo(&sql);
2536 * Core code already has some lock on each rel being planned, so we can
2537 * use NoLock here.
2539 rel = table_open(rte->relid, NoLock);
2542 * Recall the qual clauses that must be evaluated remotely. (These are
2543 * bare clauses not RestrictInfos, but deparse.c's appendConditions()
2544 * doesn't care.)
2546 remote_exprs = fpinfo->final_remote_exprs;
2549 * Extract the relevant RETURNING list if any.
2551 if (plan->returningLists)
2553 returningList = (List *) list_nth(plan->returningLists, subplan_index);
2556 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2557 * we fetch from the foreign server any Vars specified in RETURNING
2558 * that refer not only to the target relation but to non-target
2559 * relations. So we'll deparse them into the RETURNING clause of the
2560 * remote query; use a targetlist consisting of them instead, which
2561 * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
2562 * node below.
2564 if (fscan->scan.scanrelid == 0)
2565 returningList = build_remote_returning(resultRelation, rel,
2566 returningList);
2570 * Construct the SQL command string.
2572 switch (operation)
2574 case CMD_UPDATE:
2575 deparseDirectUpdateSql(&sql, root, resultRelation, rel,
2576 foreignrel,
2577 processed_tlist,
2578 targetAttrs,
2579 remote_exprs, &params_list,
2580 returningList, &retrieved_attrs);
2581 break;
2582 case CMD_DELETE:
2583 deparseDirectDeleteSql(&sql, root, resultRelation, rel,
2584 foreignrel,
2585 remote_exprs, &params_list,
2586 returningList, &retrieved_attrs);
2587 break;
2588 default:
2589 elog(ERROR, "unexpected operation: %d", (int) operation);
2590 break;
2594 * Update the operation and target relation info.
2596 fscan->operation = operation;
2597 fscan->resultRelation = resultRelation;
2600 * Update the fdw_exprs list that will be available to the executor.
2602 fscan->fdw_exprs = params_list;
2605 * Update the fdw_private list that will be available to the executor.
2606 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
2608 fscan->fdw_private = list_make4(makeString(sql.data),
2609 makeBoolean((retrieved_attrs != NIL)),
2610 retrieved_attrs,
2611 makeBoolean(plan->canSetTag));
2614 * Update the foreign-join-related fields.
2616 if (fscan->scan.scanrelid == 0)
2618 /* No need for the outer subplan. */
2619 fscan->scan.plan.lefttree = NULL;
2621 /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
2622 if (returningList)
2623 rebuild_fdw_scan_tlist(fscan, returningList);
2627 * Finally, unset the async-capable flag if it is set, as we currently
2628 * don't support asynchronous execution of direct modifications.
2630 if (fscan->scan.plan.async_capable)
2631 fscan->scan.plan.async_capable = false;
2633 table_close(rel, NoLock);
2634 return true;
2638 * postgresBeginDirectModify
2639 * Prepare a direct foreign table modification
2641 static void
2642 postgresBeginDirectModify(ForeignScanState *node, int eflags)
2644 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
2645 EState *estate = node->ss.ps.state;
2646 PgFdwDirectModifyState *dmstate;
2647 Index rtindex;
2648 Oid userid;
2649 ForeignTable *table;
2650 UserMapping *user;
2651 int numParams;
2654 * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
2656 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
2657 return;
2660 * We'll save private state in node->fdw_state.
2662 dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
2663 node->fdw_state = (void *) dmstate;
2666 * Identify which user to do the remote access as. This should match what
2667 * ExecCheckPermissions() does.
2669 userid = OidIsValid(fsplan->checkAsUser) ? fsplan->checkAsUser : GetUserId();
2671 /* Get info about foreign table. */
2672 rtindex = node->resultRelInfo->ri_RangeTableIndex;
2673 if (fsplan->scan.scanrelid == 0)
2674 dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
2675 else
2676 dmstate->rel = node->ss.ss_currentRelation;
2677 table = GetForeignTable(RelationGetRelid(dmstate->rel));
2678 user = GetUserMapping(userid, table->serverid);
2681 * Get connection to the foreign server. Connection manager will
2682 * establish new connection if necessary.
2684 dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
2686 /* Update the foreign-join-related fields. */
2687 if (fsplan->scan.scanrelid == 0)
2689 /* Save info about foreign table. */
2690 dmstate->resultRel = dmstate->rel;
2693 * Set dmstate->rel to NULL to teach get_returning_data() and
2694 * make_tuple_from_result_row() that columns fetched from the remote
2695 * server are described by fdw_scan_tlist of the foreign-scan plan
2696 * node, not the tuple descriptor for the target relation.
2698 dmstate->rel = NULL;
2701 /* Initialize state variable */
2702 dmstate->num_tuples = -1; /* -1 means not set yet */
2704 /* Get private info created by planner functions. */
2705 dmstate->query = strVal(list_nth(fsplan->fdw_private,
2706 FdwDirectModifyPrivateUpdateSql));
2707 dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private,
2708 FdwDirectModifyPrivateHasReturning));
2709 dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
2710 FdwDirectModifyPrivateRetrievedAttrs);
2711 dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
2712 FdwDirectModifyPrivateSetProcessed));
2714 /* Create context for per-tuple temp workspace. */
2715 dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
2716 "postgres_fdw temporary data",
2717 ALLOCSET_SMALL_SIZES);
2719 /* Prepare for input conversion of RETURNING results. */
2720 if (dmstate->has_returning)
2722 TupleDesc tupdesc;
2724 if (fsplan->scan.scanrelid == 0)
2725 tupdesc = get_tupdesc_for_join_scan_tuples(node);
2726 else
2727 tupdesc = RelationGetDescr(dmstate->rel);
2729 dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
2732 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
2733 * initialize a filter to extract an updated/deleted tuple from a scan
2734 * tuple.
2736 if (fsplan->scan.scanrelid == 0)
2737 init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
2741 * Prepare for processing of parameters used in remote query, if any.
2743 numParams = list_length(fsplan->fdw_exprs);
2744 dmstate->numParams = numParams;
2745 if (numParams > 0)
2746 prepare_query_params((PlanState *) node,
2747 fsplan->fdw_exprs,
2748 numParams,
2749 &dmstate->param_flinfo,
2750 &dmstate->param_exprs,
2751 &dmstate->param_values);
2755 * postgresIterateDirectModify
2756 * Execute a direct foreign table modification
2758 static TupleTableSlot *
2759 postgresIterateDirectModify(ForeignScanState *node)
2761 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2762 EState *estate = node->ss.ps.state;
2763 ResultRelInfo *resultRelInfo = node->resultRelInfo;
2766 * If this is the first call after Begin, execute the statement.
2768 if (dmstate->num_tuples == -1)
2769 execute_dml_stmt(node);
2772 * If the local query doesn't specify RETURNING, just clear tuple slot.
2774 if (!resultRelInfo->ri_projectReturning)
2776 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
2777 Instrumentation *instr = node->ss.ps.instrument;
2779 Assert(!dmstate->has_returning);
2781 /* Increment the command es_processed count if necessary. */
2782 if (dmstate->set_processed)
2783 estate->es_processed += dmstate->num_tuples;
2785 /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
2786 if (instr)
2787 instr->tuplecount += dmstate->num_tuples;
2789 return ExecClearTuple(slot);
2793 * Get the next RETURNING tuple.
2795 return get_returning_data(node);
2799 * postgresEndDirectModify
2800 * Finish a direct foreign table modification
2802 static void
2803 postgresEndDirectModify(ForeignScanState *node)
2805 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
2807 /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
2808 if (dmstate == NULL)
2809 return;
2811 /* Release PGresult */
2812 PQclear(dmstate->result);
2814 /* Release remote connection */
2815 ReleaseConnection(dmstate->conn);
2816 dmstate->conn = NULL;
2818 /* MemoryContext will be deleted automatically. */
2822 * postgresExplainForeignScan
2823 * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
2825 static void
2826 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
2828 ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan);
2829 List *fdw_private = plan->fdw_private;
2832 * Identify foreign scans that are really joins or upper relations. The
2833 * input looks something like "(1) LEFT JOIN (2)", and we must replace the
2834 * digit string(s), which are RT indexes, with the correct relation names.
2835 * We do that here, not when the plan is created, because we can't know
2836 * what aliases ruleutils.c will assign at plan creation time.
2838 if (list_length(fdw_private) > FdwScanPrivateRelations)
2840 StringInfo relations;
2841 char *rawrelations;
2842 char *ptr;
2843 int minrti,
2844 rtoffset;
2846 rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
2849 * A difficulty with using a string representation of RT indexes is
2850 * that setrefs.c won't update the string when flattening the
2851 * rangetable. To find out what rtoffset was applied, identify the
2852 * minimum RT index appearing in the string and compare it to the
2853 * minimum member of plan->fs_base_relids. (We expect all the relids
2854 * in the join will have been offset by the same amount; the Asserts
2855 * below should catch it if that ever changes.)
2857 minrti = INT_MAX;
2858 ptr = rawrelations;
2859 while (*ptr)
2861 if (isdigit((unsigned char) *ptr))
2863 int rti = strtol(ptr, &ptr, 10);
2865 if (rti < minrti)
2866 minrti = rti;
2868 else
2869 ptr++;
2871 rtoffset = bms_next_member(plan->fs_base_relids, -1) - minrti;
2873 /* Now we can translate the string */
2874 relations = makeStringInfo();
2875 ptr = rawrelations;
2876 while (*ptr)
2878 if (isdigit((unsigned char) *ptr))
2880 int rti = strtol(ptr, &ptr, 10);
2881 RangeTblEntry *rte;
2882 char *relname;
2883 char *refname;
2885 rti += rtoffset;
2886 Assert(bms_is_member(rti, plan->fs_base_relids));
2887 rte = rt_fetch(rti, es->rtable);
2888 Assert(rte->rtekind == RTE_RELATION);
2889 /* This logic should agree with explain.c's ExplainTargetRel */
2890 relname = get_rel_name(rte->relid);
2891 if (es->verbose)
2893 char *namespace;
2895 namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid));
2896 appendStringInfo(relations, "%s.%s",
2897 quote_identifier(namespace),
2898 quote_identifier(relname));
2900 else
2901 appendStringInfoString(relations,
2902 quote_identifier(relname));
2903 refname = (char *) list_nth(es->rtable_names, rti - 1);
2904 if (refname == NULL)
2905 refname = rte->eref->aliasname;
2906 if (strcmp(refname, relname) != 0)
2907 appendStringInfo(relations, " %s",
2908 quote_identifier(refname));
2910 else
2911 appendStringInfoChar(relations, *ptr++);
2913 ExplainPropertyText("Relations", relations->data, es);
2917 * Add remote query, when VERBOSE option is specified.
2919 if (es->verbose)
2921 char *sql;
2923 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
2924 ExplainPropertyText("Remote SQL", sql, es);
2929 * postgresExplainForeignModify
2930 * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
2932 static void
2933 postgresExplainForeignModify(ModifyTableState *mtstate,
2934 ResultRelInfo *rinfo,
2935 List *fdw_private,
2936 int subplan_index,
2937 ExplainState *es)
2939 if (es->verbose)
2941 char *sql = strVal(list_nth(fdw_private,
2942 FdwModifyPrivateUpdateSql));
2944 ExplainPropertyText("Remote SQL", sql, es);
2947 * For INSERT we should always have batch size >= 1, but UPDATE and
2948 * DELETE don't support batching so don't show the property.
2950 if (rinfo->ri_BatchSize > 0)
2951 ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
2956 * postgresExplainDirectModify
2957 * Produce extra output for EXPLAIN of a ForeignScan that modifies a
2958 * foreign table directly
2960 static void
2961 postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
2963 List *fdw_private;
2964 char *sql;
2966 if (es->verbose)
2968 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
2969 sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
2970 ExplainPropertyText("Remote SQL", sql, es);
2975 * postgresExecForeignTruncate
2976 * Truncate one or more foreign tables
2978 static void
2979 postgresExecForeignTruncate(List *rels,
2980 DropBehavior behavior,
2981 bool restart_seqs)
2983 Oid serverid = InvalidOid;
2984 UserMapping *user = NULL;
2985 PGconn *conn = NULL;
2986 StringInfoData sql;
2987 ListCell *lc;
2988 bool server_truncatable = true;
2991 * By default, all postgres_fdw foreign tables are assumed truncatable.
2992 * This can be overridden by a per-server setting, which in turn can be
2993 * overridden by a per-table setting.
2995 foreach(lc, rels)
2997 ForeignServer *server = NULL;
2998 Relation rel = lfirst(lc);
2999 ForeignTable *table = GetForeignTable(RelationGetRelid(rel));
3000 ListCell *cell;
3001 bool truncatable;
3004 * First time through, determine whether the foreign server allows
3005 * truncates. Since all specified foreign tables are assumed to belong
3006 * to the same foreign server, this result can be used for other
3007 * foreign tables.
3009 if (!OidIsValid(serverid))
3011 serverid = table->serverid;
3012 server = GetForeignServer(serverid);
3014 foreach(cell, server->options)
3016 DefElem *defel = (DefElem *) lfirst(cell);
3018 if (strcmp(defel->defname, "truncatable") == 0)
3020 server_truncatable = defGetBoolean(defel);
3021 break;
3027 * Confirm that all specified foreign tables belong to the same
3028 * foreign server.
3030 Assert(table->serverid == serverid);
3032 /* Determine whether this foreign table allows truncations */
3033 truncatable = server_truncatable;
3034 foreach(cell, table->options)
3036 DefElem *defel = (DefElem *) lfirst(cell);
3038 if (strcmp(defel->defname, "truncatable") == 0)
3040 truncatable = defGetBoolean(defel);
3041 break;
3045 if (!truncatable)
3046 ereport(ERROR,
3047 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3048 errmsg("foreign table \"%s\" does not allow truncates",
3049 RelationGetRelationName(rel))));
3051 Assert(OidIsValid(serverid));
3054 * Get connection to the foreign server. Connection manager will
3055 * establish new connection if necessary.
3057 user = GetUserMapping(GetUserId(), serverid);
3058 conn = GetConnection(user, false, NULL);
3060 /* Construct the TRUNCATE command string */
3061 initStringInfo(&sql);
3062 deparseTruncateSql(&sql, rels, behavior, restart_seqs);
3064 /* Issue the TRUNCATE command to remote server */
3065 do_sql_command(conn, sql.data);
3067 pfree(sql.data);
3071 * estimate_path_cost_size
3072 * Get cost and size estimates for a foreign scan on given foreign relation
3073 * either a base relation or a join between foreign relations or an upper
3074 * relation containing foreign relations.
3076 * param_join_conds are the parameterization clauses with outer relations.
3077 * pathkeys specify the expected sort order if any for given path being costed.
3078 * fpextra specifies additional post-scan/join-processing steps such as the
3079 * final sort and the LIMIT restriction.
3081 * The function returns the cost and size estimates in p_rows, p_width,
3082 * p_startup_cost and p_total_cost variables.
3084 static void
3085 estimate_path_cost_size(PlannerInfo *root,
3086 RelOptInfo *foreignrel,
3087 List *param_join_conds,
3088 List *pathkeys,
3089 PgFdwPathExtraData *fpextra,
3090 double *p_rows, int *p_width,
3091 Cost *p_startup_cost, Cost *p_total_cost)
3093 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
3094 double rows;
3095 double retrieved_rows;
3096 int width;
3097 Cost startup_cost;
3098 Cost total_cost;
3100 /* Make sure the core code has set up the relation's reltarget */
3101 Assert(foreignrel->reltarget);
3104 * If the table or the server is configured to use remote estimates,
3105 * connect to the foreign server and execute EXPLAIN to estimate the
3106 * number of rows selected by the restriction+join clauses. Otherwise,
3107 * estimate rows using whatever statistics we have locally, in a way
3108 * similar to ordinary tables.
3110 if (fpinfo->use_remote_estimate)
3112 List *remote_param_join_conds;
3113 List *local_param_join_conds;
3114 StringInfoData sql;
3115 PGconn *conn;
3116 Selectivity local_sel;
3117 QualCost local_cost;
3118 List *fdw_scan_tlist = NIL;
3119 List *remote_conds;
3121 /* Required only to be passed to deparseSelectStmtForRel */
3122 List *retrieved_attrs;
3125 * param_join_conds might contain both clauses that are safe to send
3126 * across, and clauses that aren't.
3128 classifyConditions(root, foreignrel, param_join_conds,
3129 &remote_param_join_conds, &local_param_join_conds);
3131 /* Build the list of columns to be fetched from the foreign server. */
3132 if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
3133 fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
3134 else
3135 fdw_scan_tlist = NIL;
3138 * The complete list of remote conditions includes everything from
3139 * baserestrictinfo plus any extra join_conds relevant to this
3140 * particular path.
3142 remote_conds = list_concat(remote_param_join_conds,
3143 fpinfo->remote_conds);
3146 * Construct EXPLAIN query including the desired SELECT, FROM, and
3147 * WHERE clauses. Params and other-relation Vars are replaced by dummy
3148 * values, so don't request params_list.
3150 initStringInfo(&sql);
3151 appendStringInfoString(&sql, "EXPLAIN ");
3152 deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
3153 remote_conds, pathkeys,
3154 fpextra ? fpextra->has_final_sort : false,
3155 fpextra ? fpextra->has_limit : false,
3156 false, &retrieved_attrs, NULL);
3158 /* Get the remote estimate */
3159 conn = GetConnection(fpinfo->user, false, NULL);
3160 get_remote_estimate(sql.data, conn, &rows, &width,
3161 &startup_cost, &total_cost);
3162 ReleaseConnection(conn);
3164 retrieved_rows = rows;
3166 /* Factor in the selectivity of the locally-checked quals */
3167 local_sel = clauselist_selectivity(root,
3168 local_param_join_conds,
3169 foreignrel->relid,
3170 JOIN_INNER,
3171 NULL);
3172 local_sel *= fpinfo->local_conds_sel;
3174 rows = clamp_row_est(rows * local_sel);
3176 /* Add in the eval cost of the locally-checked quals */
3177 startup_cost += fpinfo->local_conds_cost.startup;
3178 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3179 cost_qual_eval(&local_cost, local_param_join_conds, root);
3180 startup_cost += local_cost.startup;
3181 total_cost += local_cost.per_tuple * retrieved_rows;
3184 * Add in tlist eval cost for each output row. In case of an
3185 * aggregate, some of the tlist expressions such as grouping
3186 * expressions will be evaluated remotely, so adjust the costs.
3188 startup_cost += foreignrel->reltarget->cost.startup;
3189 total_cost += foreignrel->reltarget->cost.startup;
3190 total_cost += foreignrel->reltarget->cost.per_tuple * rows;
3191 if (IS_UPPER_REL(foreignrel))
3193 QualCost tlist_cost;
3195 cost_qual_eval(&tlist_cost, fdw_scan_tlist, root);
3196 startup_cost -= tlist_cost.startup;
3197 total_cost -= tlist_cost.startup;
3198 total_cost -= tlist_cost.per_tuple * rows;
3201 else
3203 Cost run_cost = 0;
3206 * We don't support join conditions in this mode (hence, no
3207 * parameterized paths can be made).
3209 Assert(param_join_conds == NIL);
3212 * We will come here again and again with different set of pathkeys or
3213 * additional post-scan/join-processing steps that caller wants to
3214 * cost. We don't need to calculate the cost/size estimates for the
3215 * underlying scan, join, or grouping each time. Instead, use those
3216 * estimates if we have cached them already.
3218 if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0)
3220 Assert(fpinfo->retrieved_rows >= 0);
3222 rows = fpinfo->rows;
3223 retrieved_rows = fpinfo->retrieved_rows;
3224 width = fpinfo->width;
3225 startup_cost = fpinfo->rel_startup_cost;
3226 run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
3229 * If we estimate the costs of a foreign scan or a foreign join
3230 * with additional post-scan/join-processing steps, the scan or
3231 * join costs obtained from the cache wouldn't yet contain the
3232 * eval costs for the final scan/join target, which would've been
3233 * updated by apply_scanjoin_target_to_paths(); add the eval costs
3234 * now.
3236 if (fpextra && !IS_UPPER_REL(foreignrel))
3238 /* Shouldn't get here unless we have LIMIT */
3239 Assert(fpextra->has_limit);
3240 Assert(foreignrel->reloptkind == RELOPT_BASEREL ||
3241 foreignrel->reloptkind == RELOPT_JOINREL);
3242 startup_cost += foreignrel->reltarget->cost.startup;
3243 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3246 else if (IS_JOIN_REL(foreignrel))
3248 PgFdwRelationInfo *fpinfo_i;
3249 PgFdwRelationInfo *fpinfo_o;
3250 QualCost join_cost;
3251 QualCost remote_conds_cost;
3252 double nrows;
3254 /* Use rows/width estimates made by the core code. */
3255 rows = foreignrel->rows;
3256 width = foreignrel->reltarget->width;
3258 /* For join we expect inner and outer relations set */
3259 Assert(fpinfo->innerrel && fpinfo->outerrel);
3261 fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
3262 fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
3264 /* Estimate of number of rows in cross product */
3265 nrows = fpinfo_i->rows * fpinfo_o->rows;
3268 * Back into an estimate of the number of retrieved rows. Just in
3269 * case this is nuts, clamp to at most nrows.
3271 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3272 retrieved_rows = Min(retrieved_rows, nrows);
3275 * The cost of foreign join is estimated as cost of generating
3276 * rows for the joining relations + cost for applying quals on the
3277 * rows.
3281 * Calculate the cost of clauses pushed down to the foreign server
3283 cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
3284 /* Calculate the cost of applying join clauses */
3285 cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
3288 * Startup cost includes startup cost of joining relations and the
3289 * startup cost for join and other clauses. We do not include the
3290 * startup cost specific to join strategy (e.g. setting up hash
3291 * tables) since we do not know what strategy the foreign server
3292 * is going to use.
3294 startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
3295 startup_cost += join_cost.startup;
3296 startup_cost += remote_conds_cost.startup;
3297 startup_cost += fpinfo->local_conds_cost.startup;
3300 * Run time cost includes:
3302 * 1. Run time cost (total_cost - startup_cost) of relations being
3303 * joined
3305 * 2. Run time cost of applying join clauses on the cross product
3306 * of the joining relations.
3308 * 3. Run time cost of applying pushed down other clauses on the
3309 * result of join
3311 * 4. Run time cost of applying nonpushable other clauses locally
3312 * on the result fetched from the foreign server.
3314 run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
3315 run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
3316 run_cost += nrows * join_cost.per_tuple;
3317 nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
3318 run_cost += nrows * remote_conds_cost.per_tuple;
3319 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3321 /* Add in tlist eval cost for each output row */
3322 startup_cost += foreignrel->reltarget->cost.startup;
3323 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3325 else if (IS_UPPER_REL(foreignrel))
3327 RelOptInfo *outerrel = fpinfo->outerrel;
3328 PgFdwRelationInfo *ofpinfo;
3329 AggClauseCosts aggcosts;
3330 double input_rows;
3331 int numGroupCols;
3332 double numGroups = 1;
3334 /* The upper relation should have its outer relation set */
3335 Assert(outerrel);
3336 /* and that outer relation should have its reltarget set */
3337 Assert(outerrel->reltarget);
3340 * This cost model is mixture of costing done for sorted and
3341 * hashed aggregates in cost_agg(). We are not sure which
3342 * strategy will be considered at remote side, thus for
3343 * simplicity, we put all startup related costs in startup_cost
3344 * and all finalization and run cost are added in total_cost.
3347 ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private;
3349 /* Get rows from input rel */
3350 input_rows = ofpinfo->rows;
3352 /* Collect statistics about aggregates for estimating costs. */
3353 MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
3354 if (root->parse->hasAggs)
3356 get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts);
3359 /* Get number of grouping columns and possible number of groups */
3360 numGroupCols = list_length(root->processed_groupClause);
3361 numGroups = estimate_num_groups(root,
3362 get_sortgrouplist_exprs(root->processed_groupClause,
3363 fpinfo->grouped_tlist),
3364 input_rows, NULL, NULL);
3367 * Get the retrieved_rows and rows estimates. If there are HAVING
3368 * quals, account for their selectivity.
3370 if (root->hasHavingQual)
3372 /* Factor in the selectivity of the remotely-checked quals */
3373 retrieved_rows =
3374 clamp_row_est(numGroups *
3375 clauselist_selectivity(root,
3376 fpinfo->remote_conds,
3378 JOIN_INNER,
3379 NULL));
3380 /* Factor in the selectivity of the locally-checked quals */
3381 rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel);
3383 else
3385 rows = retrieved_rows = numGroups;
3388 /* Use width estimate made by the core code. */
3389 width = foreignrel->reltarget->width;
3391 /*-----
3392 * Startup cost includes:
3393 * 1. Startup cost for underneath input relation, adjusted for
3394 * tlist replacement by apply_scanjoin_target_to_paths()
3395 * 2. Cost of performing aggregation, per cost_agg()
3396 *-----
3398 startup_cost = ofpinfo->rel_startup_cost;
3399 startup_cost += outerrel->reltarget->cost.startup;
3400 startup_cost += aggcosts.transCost.startup;
3401 startup_cost += aggcosts.transCost.per_tuple * input_rows;
3402 startup_cost += aggcosts.finalCost.startup;
3403 startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
3405 /*-----
3406 * Run time cost includes:
3407 * 1. Run time cost of underneath input relation, adjusted for
3408 * tlist replacement by apply_scanjoin_target_to_paths()
3409 * 2. Run time cost of performing aggregation, per cost_agg()
3410 *-----
3412 run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
3413 run_cost += outerrel->reltarget->cost.per_tuple * input_rows;
3414 run_cost += aggcosts.finalCost.per_tuple * numGroups;
3415 run_cost += cpu_tuple_cost * numGroups;
3417 /* Account for the eval cost of HAVING quals, if any */
3418 if (root->hasHavingQual)
3420 QualCost remote_cost;
3422 /* Add in the eval cost of the remotely-checked quals */
3423 cost_qual_eval(&remote_cost, fpinfo->remote_conds, root);
3424 startup_cost += remote_cost.startup;
3425 run_cost += remote_cost.per_tuple * numGroups;
3426 /* Add in the eval cost of the locally-checked quals */
3427 startup_cost += fpinfo->local_conds_cost.startup;
3428 run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
3431 /* Add in tlist eval cost for each output row */
3432 startup_cost += foreignrel->reltarget->cost.startup;
3433 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3435 else
3437 Cost cpu_per_tuple;
3439 /* Use rows/width estimates made by set_baserel_size_estimates. */
3440 rows = foreignrel->rows;
3441 width = foreignrel->reltarget->width;
3444 * Back into an estimate of the number of retrieved rows. Just in
3445 * case this is nuts, clamp to at most foreignrel->tuples.
3447 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
3448 retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
3451 * Cost as though this were a seqscan, which is pessimistic. We
3452 * effectively imagine the local_conds are being evaluated
3453 * remotely, too.
3455 startup_cost = 0;
3456 run_cost = 0;
3457 run_cost += seq_page_cost * foreignrel->pages;
3459 startup_cost += foreignrel->baserestrictcost.startup;
3460 cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
3461 run_cost += cpu_per_tuple * foreignrel->tuples;
3463 /* Add in tlist eval cost for each output row */
3464 startup_cost += foreignrel->reltarget->cost.startup;
3465 run_cost += foreignrel->reltarget->cost.per_tuple * rows;
3469 * Without remote estimates, we have no real way to estimate the cost
3470 * of generating sorted output. It could be free if the query plan
3471 * the remote side would have chosen generates properly-sorted output
3472 * anyway, but in most cases it will cost something. Estimate a value
3473 * high enough that we won't pick the sorted path when the ordering
3474 * isn't locally useful, but low enough that we'll err on the side of
3475 * pushing down the ORDER BY clause when it's useful to do so.
3477 if (pathkeys != NIL)
3479 if (IS_UPPER_REL(foreignrel))
3481 Assert(foreignrel->reloptkind == RELOPT_UPPER_REL &&
3482 fpinfo->stage == UPPERREL_GROUP_AGG);
3483 adjust_foreign_grouping_path_cost(root, pathkeys,
3484 retrieved_rows, width,
3485 fpextra->limit_tuples,
3486 &startup_cost, &run_cost);
3488 else
3490 startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3491 run_cost *= DEFAULT_FDW_SORT_MULTIPLIER;
3495 total_cost = startup_cost + run_cost;
3497 /* Adjust the cost estimates if we have LIMIT */
3498 if (fpextra && fpextra->has_limit)
3500 adjust_limit_rows_costs(&rows, &startup_cost, &total_cost,
3501 fpextra->offset_est, fpextra->count_est);
3502 retrieved_rows = rows;
3507 * If this includes the final sort step, the given target, which will be
3508 * applied to the resulting path, might have different expressions from
3509 * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist
3510 * eval costs.
3512 if (fpextra && fpextra->has_final_sort &&
3513 fpextra->target != foreignrel->reltarget)
3515 QualCost oldcost = foreignrel->reltarget->cost;
3516 QualCost newcost = fpextra->target->cost;
3518 startup_cost += newcost.startup - oldcost.startup;
3519 total_cost += newcost.startup - oldcost.startup;
3520 total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows;
3524 * Cache the retrieved rows and cost estimates for scans, joins, or
3525 * groupings without any parameterization, pathkeys, or additional
3526 * post-scan/join-processing steps, before adding the costs for
3527 * transferring data from the foreign server. These estimates are useful
3528 * for costing remote joins involving this relation or costing other
3529 * remote operations on this relation such as remote sorts and remote
3530 * LIMIT restrictions, when the costs can not be obtained from the foreign
3531 * server. This function will be called at least once for every foreign
3532 * relation without any parameterization, pathkeys, or additional
3533 * post-scan/join-processing steps.
3535 if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL)
3537 fpinfo->retrieved_rows = retrieved_rows;
3538 fpinfo->rel_startup_cost = startup_cost;
3539 fpinfo->rel_total_cost = total_cost;
3543 * Add some additional cost factors to account for connection overhead
3544 * (fdw_startup_cost), transferring data across the network
3545 * (fdw_tuple_cost per retrieved row), and local manipulation of the data
3546 * (cpu_tuple_cost per retrieved row).
3548 startup_cost += fpinfo->fdw_startup_cost;
3549 total_cost += fpinfo->fdw_startup_cost;
3550 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
3551 total_cost += cpu_tuple_cost * retrieved_rows;
3554 * If we have LIMIT, we should prefer performing the restriction remotely
3555 * rather than locally, as the former avoids extra row fetches from the
3556 * remote that the latter might cause. But since the core code doesn't
3557 * account for such fetches when estimating the costs of the local
3558 * restriction (see create_limit_path()), there would be no difference
3559 * between the costs of the local restriction and the costs of the remote
3560 * restriction estimated above if we don't use remote estimates (except
3561 * for the case where the foreignrel is a grouping relation, the given
3562 * pathkeys is not NIL, and the effects of a bounded sort for that rel is
3563 * accounted for in costing the remote restriction). Tweak the costs of
3564 * the remote restriction to ensure we'll prefer it if LIMIT is a useful
3565 * one.
3567 if (!fpinfo->use_remote_estimate &&
3568 fpextra && fpextra->has_limit &&
3569 fpextra->limit_tuples > 0 &&
3570 fpextra->limit_tuples < fpinfo->rows)
3572 Assert(fpinfo->rows > 0);
3573 total_cost -= (total_cost - startup_cost) * 0.05 *
3574 (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows;
3577 /* Return results. */
3578 *p_rows = rows;
3579 *p_width = width;
3580 *p_startup_cost = startup_cost;
3581 *p_total_cost = total_cost;
3585 * Estimate costs of executing a SQL statement remotely.
3586 * The given "sql" must be an EXPLAIN command.
3588 static void
3589 get_remote_estimate(const char *sql, PGconn *conn,
3590 double *rows, int *width,
3591 Cost *startup_cost, Cost *total_cost)
3593 PGresult *volatile res = NULL;
3595 /* PGresult must be released before leaving this function. */
3596 PG_TRY();
3598 char *line;
3599 char *p;
3600 int n;
3603 * Execute EXPLAIN remotely.
3605 res = pgfdw_exec_query(conn, sql, NULL);
3606 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3607 pgfdw_report_error(ERROR, res, conn, false, sql);
3610 * Extract cost numbers for topmost plan node. Note we search for a
3611 * left paren from the end of the line to avoid being confused by
3612 * other uses of parentheses.
3614 line = PQgetvalue(res, 0, 0);
3615 p = strrchr(line, '(');
3616 if (p == NULL)
3617 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3618 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
3619 startup_cost, total_cost, rows, width);
3620 if (n != 4)
3621 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
3623 PG_FINALLY();
3625 PQclear(res);
3627 PG_END_TRY();
3631 * Adjust the cost estimates of a foreign grouping path to include the cost of
3632 * generating properly-sorted output.
3634 static void
3635 adjust_foreign_grouping_path_cost(PlannerInfo *root,
3636 List *pathkeys,
3637 double retrieved_rows,
3638 double width,
3639 double limit_tuples,
3640 Cost *p_startup_cost,
3641 Cost *p_run_cost)
3644 * If the GROUP BY clause isn't sort-able, the plan chosen by the remote
3645 * side is unlikely to generate properly-sorted output, so it would need
3646 * an explicit sort; adjust the given costs with cost_sort(). Likewise,
3647 * if the GROUP BY clause is sort-able but isn't a superset of the given
3648 * pathkeys, adjust the costs with that function. Otherwise, adjust the
3649 * costs by applying the same heuristic as for the scan or join case.
3651 if (!grouping_is_sortable(root->processed_groupClause) ||
3652 !pathkeys_contained_in(pathkeys, root->group_pathkeys))
3654 Path sort_path; /* dummy for result of cost_sort */
3656 cost_sort(&sort_path,
3657 root,
3658 pathkeys,
3659 *p_startup_cost + *p_run_cost,
3660 retrieved_rows,
3661 width,
3662 0.0,
3663 work_mem,
3664 limit_tuples);
3666 *p_startup_cost = sort_path.startup_cost;
3667 *p_run_cost = sort_path.total_cost - sort_path.startup_cost;
3669 else
3672 * The default extra cost seems too large for foreign-grouping cases;
3673 * add 1/4th of that default.
3675 double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER
3676 - 1.0) * 0.25;
3678 *p_startup_cost *= sort_multiplier;
3679 *p_run_cost *= sort_multiplier;
3684 * Detect whether we want to process an EquivalenceClass member.
3686 * This is a callback for use by generate_implied_equalities_for_column.
3688 static bool
3689 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
3690 EquivalenceClass *ec, EquivalenceMember *em,
3691 void *arg)
3693 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
3694 Expr *expr = em->em_expr;
3697 * If we've identified what we're processing in the current scan, we only
3698 * want to match that expression.
3700 if (state->current != NULL)
3701 return equal(expr, state->current);
3704 * Otherwise, ignore anything we've already processed.
3706 if (list_member(state->already_used, expr))
3707 return false;
3709 /* This is the new target to process. */
3710 state->current = expr;
3711 return true;
3715 * Create cursor for node's query with current parameter values.
3717 static void
3718 create_cursor(ForeignScanState *node)
3720 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3721 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3722 int numParams = fsstate->numParams;
3723 const char **values = fsstate->param_values;
3724 PGconn *conn = fsstate->conn;
3725 StringInfoData buf;
3726 PGresult *res;
3728 /* First, process a pending asynchronous request, if any. */
3729 if (fsstate->conn_state->pendingAreq)
3730 process_pending_request(fsstate->conn_state->pendingAreq);
3733 * Construct array of query parameter values in text format. We do the
3734 * conversions in the short-lived per-tuple context, so as not to cause a
3735 * memory leak over repeated scans.
3737 if (numParams > 0)
3739 MemoryContext oldcontext;
3741 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
3743 process_query_params(econtext,
3744 fsstate->param_flinfo,
3745 fsstate->param_exprs,
3746 values);
3748 MemoryContextSwitchTo(oldcontext);
3751 /* Construct the DECLARE CURSOR command */
3752 initStringInfo(&buf);
3753 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
3754 fsstate->cursor_number, fsstate->query);
3757 * Notice that we pass NULL for paramTypes, thus forcing the remote server
3758 * to infer types for all parameters. Since we explicitly cast every
3759 * parameter (see deparse.c), the "inference" is trivial and will produce
3760 * the desired result. This allows us to avoid assuming that the remote
3761 * server has the same OIDs we do for the parameters' types.
3763 if (!PQsendQueryParams(conn, buf.data, numParams,
3764 NULL, values, NULL, NULL, 0))
3765 pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
3768 * Get the result, and check for success.
3770 * We don't use a PG_TRY block here, so be careful not to throw error
3771 * without releasing the PGresult.
3773 res = pgfdw_get_result(conn);
3774 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3775 pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
3776 PQclear(res);
3778 /* Mark the cursor as created, and show no tuples have been retrieved */
3779 fsstate->cursor_exists = true;
3780 fsstate->tuples = NULL;
3781 fsstate->num_tuples = 0;
3782 fsstate->next_tuple = 0;
3783 fsstate->fetch_ct_2 = 0;
3784 fsstate->eof_reached = false;
3786 /* Clean up */
3787 pfree(buf.data);
3791 * Fetch some more rows from the node's cursor.
3793 static void
3794 fetch_more_data(ForeignScanState *node)
3796 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
3797 PGresult *volatile res = NULL;
3798 MemoryContext oldcontext;
3801 * We'll store the tuples in the batch_cxt. First, flush the previous
3802 * batch.
3804 fsstate->tuples = NULL;
3805 MemoryContextReset(fsstate->batch_cxt);
3806 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
3808 /* PGresult must be released before leaving this function. */
3809 PG_TRY();
3811 PGconn *conn = fsstate->conn;
3812 int numrows;
3813 int i;
3815 if (fsstate->async_capable)
3817 Assert(fsstate->conn_state->pendingAreq);
3820 * The query was already sent by an earlier call to
3821 * fetch_more_data_begin. So now we just fetch the result.
3823 res = pgfdw_get_result(conn);
3824 /* On error, report the original query, not the FETCH. */
3825 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3826 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3828 /* Reset per-connection state */
3829 fsstate->conn_state->pendingAreq = NULL;
3831 else
3833 char sql[64];
3835 /* This is a regular synchronous fetch. */
3836 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
3837 fsstate->fetch_size, fsstate->cursor_number);
3839 res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
3840 /* On error, report the original query, not the FETCH. */
3841 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3842 pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
3845 /* Convert the data into HeapTuples */
3846 numrows = PQntuples(res);
3847 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
3848 fsstate->num_tuples = numrows;
3849 fsstate->next_tuple = 0;
3851 for (i = 0; i < numrows; i++)
3853 Assert(IsA(node->ss.ps.plan, ForeignScan));
3855 fsstate->tuples[i] =
3856 make_tuple_from_result_row(res, i,
3857 fsstate->rel,
3858 fsstate->attinmeta,
3859 fsstate->retrieved_attrs,
3860 node,
3861 fsstate->temp_cxt);
3864 /* Update fetch_ct_2 */
3865 if (fsstate->fetch_ct_2 < 2)
3866 fsstate->fetch_ct_2++;
3868 /* Must be EOF if we didn't get as many tuples as we asked for. */
3869 fsstate->eof_reached = (numrows < fsstate->fetch_size);
3871 PG_FINALLY();
3873 PQclear(res);
3875 PG_END_TRY();
3877 MemoryContextSwitchTo(oldcontext);
3881 * Force assorted GUC parameters to settings that ensure that we'll output
3882 * data values in a form that is unambiguous to the remote server.
3884 * This is rather expensive and annoying to do once per row, but there's
3885 * little choice if we want to be sure values are transmitted accurately;
3886 * we can't leave the settings in place between rows for fear of affecting
3887 * user-visible computations.
3889 * We use the equivalent of a function SET option to allow the settings to
3890 * persist only until the caller calls reset_transmission_modes(). If an
3891 * error is thrown in between, guc.c will take care of undoing the settings.
3893 * The return value is the nestlevel that must be passed to
3894 * reset_transmission_modes() to undo things.
3897 set_transmission_modes(void)
3899 int nestlevel = NewGUCNestLevel();
3902 * The values set here should match what pg_dump does. See also
3903 * configure_remote_session in connection.c.
3905 if (DateStyle != USE_ISO_DATES)
3906 (void) set_config_option("datestyle", "ISO",
3907 PGC_USERSET, PGC_S_SESSION,
3908 GUC_ACTION_SAVE, true, 0, false);
3909 if (IntervalStyle != INTSTYLE_POSTGRES)
3910 (void) set_config_option("intervalstyle", "postgres",
3911 PGC_USERSET, PGC_S_SESSION,
3912 GUC_ACTION_SAVE, true, 0, false);
3913 if (extra_float_digits < 3)
3914 (void) set_config_option("extra_float_digits", "3",
3915 PGC_USERSET, PGC_S_SESSION,
3916 GUC_ACTION_SAVE, true, 0, false);
3919 * In addition force restrictive search_path, in case there are any
3920 * regproc or similar constants to be printed.
3922 (void) set_config_option("search_path", "pg_catalog",
3923 PGC_USERSET, PGC_S_SESSION,
3924 GUC_ACTION_SAVE, true, 0, false);
3926 return nestlevel;
3930 * Undo the effects of set_transmission_modes().
3932 void
3933 reset_transmission_modes(int nestlevel)
3935 AtEOXact_GUC(true, nestlevel);
3939 * Utility routine to close a cursor.
3941 static void
3942 close_cursor(PGconn *conn, unsigned int cursor_number,
3943 PgFdwConnState *conn_state)
3945 char sql[64];
3946 PGresult *res;
3948 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
3951 * We don't use a PG_TRY block here, so be careful not to throw error
3952 * without releasing the PGresult.
3954 res = pgfdw_exec_query(conn, sql, conn_state);
3955 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3956 pgfdw_report_error(ERROR, res, conn, true, sql);
3957 PQclear(res);
3961 * create_foreign_modify
3962 * Construct an execution state of a foreign insert/update/delete
3963 * operation
3965 static PgFdwModifyState *
3966 create_foreign_modify(EState *estate,
3967 RangeTblEntry *rte,
3968 ResultRelInfo *resultRelInfo,
3969 CmdType operation,
3970 Plan *subplan,
3971 char *query,
3972 List *target_attrs,
3973 int values_end,
3974 bool has_returning,
3975 List *retrieved_attrs)
3977 PgFdwModifyState *fmstate;
3978 Relation rel = resultRelInfo->ri_RelationDesc;
3979 TupleDesc tupdesc = RelationGetDescr(rel);
3980 Oid userid;
3981 ForeignTable *table;
3982 UserMapping *user;
3983 AttrNumber n_params;
3984 Oid typefnoid;
3985 bool isvarlena;
3986 ListCell *lc;
3988 /* Begin constructing PgFdwModifyState. */
3989 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3990 fmstate->rel = rel;
3992 /* Identify which user to do the remote access as. */
3993 userid = ExecGetResultRelCheckAsUser(resultRelInfo, estate);
3995 /* Get info about foreign table. */
3996 table = GetForeignTable(RelationGetRelid(rel));
3997 user = GetUserMapping(userid, table->serverid);
3999 /* Open connection; report that we'll create a prepared statement. */
4000 fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
4001 fmstate->p_name = NULL; /* prepared statement not made yet */
4003 /* Set up remote query information. */
4004 fmstate->query = query;
4005 if (operation == CMD_INSERT)
4007 fmstate->query = pstrdup(fmstate->query);
4008 fmstate->orig_query = pstrdup(fmstate->query);
4010 fmstate->target_attrs = target_attrs;
4011 fmstate->values_end = values_end;
4012 fmstate->has_returning = has_returning;
4013 fmstate->retrieved_attrs = retrieved_attrs;
4015 /* Create context for per-tuple temp workspace. */
4016 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
4017 "postgres_fdw temporary data",
4018 ALLOCSET_SMALL_SIZES);
4020 /* Prepare for input conversion of RETURNING results. */
4021 if (fmstate->has_returning)
4022 fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
4024 /* Prepare for output conversion of parameters used in prepared stmt. */
4025 n_params = list_length(fmstate->target_attrs) + 1;
4026 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
4027 fmstate->p_nums = 0;
4029 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4031 Assert(subplan != NULL);
4033 /* Find the ctid resjunk column in the subplan's result */
4034 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
4035 "ctid");
4036 if (!AttributeNumberIsValid(fmstate->ctidAttno))
4037 elog(ERROR, "could not find junk ctid column");
4039 /* First transmittable parameter will be ctid */
4040 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
4041 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4042 fmstate->p_nums++;
4045 if (operation == CMD_INSERT || operation == CMD_UPDATE)
4047 /* Set up for remaining transmittable parameters */
4048 foreach(lc, fmstate->target_attrs)
4050 int attnum = lfirst_int(lc);
4051 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4053 Assert(!attr->attisdropped);
4055 /* Ignore generated columns; they are set to DEFAULT */
4056 if (attr->attgenerated)
4057 continue;
4058 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
4059 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
4060 fmstate->p_nums++;
4064 Assert(fmstate->p_nums <= n_params);
4066 /* Set batch_size from foreign server/table options. */
4067 if (operation == CMD_INSERT)
4068 fmstate->batch_size = get_batch_size_option(rel);
4070 fmstate->num_slots = 1;
4072 /* Initialize auxiliary state */
4073 fmstate->aux_fmstate = NULL;
4075 return fmstate;
4079 * execute_foreign_modify
4080 * Perform foreign-table modification as required, and fetch RETURNING
4081 * result if any. (This is the shared guts of postgresExecForeignInsert,
4082 * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
4083 * postgresExecForeignDelete.)
4085 static TupleTableSlot **
4086 execute_foreign_modify(EState *estate,
4087 ResultRelInfo *resultRelInfo,
4088 CmdType operation,
4089 TupleTableSlot **slots,
4090 TupleTableSlot **planSlots,
4091 int *numSlots)
4093 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
4094 ItemPointer ctid = NULL;
4095 const char **p_values;
4096 PGresult *res;
4097 int n_rows;
4098 StringInfoData sql;
4100 /* The operation should be INSERT, UPDATE, or DELETE */
4101 Assert(operation == CMD_INSERT ||
4102 operation == CMD_UPDATE ||
4103 operation == CMD_DELETE);
4105 /* First, process a pending asynchronous request, if any. */
4106 if (fmstate->conn_state->pendingAreq)
4107 process_pending_request(fmstate->conn_state->pendingAreq);
4110 * If the existing query was deparsed and prepared for a different number
4111 * of rows, rebuild it for the proper number.
4113 if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
4115 /* Destroy the prepared statement created previously */
4116 if (fmstate->p_name)
4117 deallocate_query(fmstate);
4119 /* Build INSERT string with numSlots records in its VALUES clause. */
4120 initStringInfo(&sql);
4121 rebuildInsertSql(&sql, fmstate->rel,
4122 fmstate->orig_query, fmstate->target_attrs,
4123 fmstate->values_end, fmstate->p_nums,
4124 *numSlots - 1);
4125 pfree(fmstate->query);
4126 fmstate->query = sql.data;
4127 fmstate->num_slots = *numSlots;
4130 /* Set up the prepared statement on the remote server, if we didn't yet */
4131 if (!fmstate->p_name)
4132 prepare_foreign_modify(fmstate);
4135 * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column
4137 if (operation == CMD_UPDATE || operation == CMD_DELETE)
4139 Datum datum;
4140 bool isNull;
4142 datum = ExecGetJunkAttribute(planSlots[0],
4143 fmstate->ctidAttno,
4144 &isNull);
4145 /* shouldn't ever get a null result... */
4146 if (isNull)
4147 elog(ERROR, "ctid is NULL");
4148 ctid = (ItemPointer) DatumGetPointer(datum);
4151 /* Convert parameters needed by prepared statement to text form */
4152 p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
4155 * Execute the prepared statement.
4157 if (!PQsendQueryPrepared(fmstate->conn,
4158 fmstate->p_name,
4159 fmstate->p_nums * (*numSlots),
4160 p_values,
4161 NULL,
4162 NULL,
4164 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4167 * Get the result, and check for success.
4169 * We don't use a PG_TRY block here, so be careful not to throw error
4170 * without releasing the PGresult.
4172 res = pgfdw_get_result(fmstate->conn);
4173 if (PQresultStatus(res) !=
4174 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4175 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4177 /* Check number of rows affected, and fetch RETURNING tuple if any */
4178 if (fmstate->has_returning)
4180 Assert(*numSlots == 1);
4181 n_rows = PQntuples(res);
4182 if (n_rows > 0)
4183 store_returning_result(fmstate, slots[0], res);
4185 else
4186 n_rows = atoi(PQcmdTuples(res));
4188 /* And clean up */
4189 PQclear(res);
4191 MemoryContextReset(fmstate->temp_cxt);
4193 *numSlots = n_rows;
4196 * Return NULL if nothing was inserted/updated/deleted on the remote end
4198 return (n_rows > 0) ? slots : NULL;
4202 * prepare_foreign_modify
4203 * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
4205 static void
4206 prepare_foreign_modify(PgFdwModifyState *fmstate)
4208 char prep_name[NAMEDATALEN];
4209 char *p_name;
4210 PGresult *res;
4213 * The caller would already have processed a pending asynchronous request
4214 * if any, so no need to do it here.
4217 /* Construct name we'll use for the prepared statement. */
4218 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
4219 GetPrepStmtNumber(fmstate->conn));
4220 p_name = pstrdup(prep_name);
4223 * We intentionally do not specify parameter types here, but leave the
4224 * remote server to derive them by default. This avoids possible problems
4225 * with the remote server using different type OIDs than we do. All of
4226 * the prepared statements we use in this module are simple enough that
4227 * the remote server will make the right choices.
4229 if (!PQsendPrepare(fmstate->conn,
4230 p_name,
4231 fmstate->query,
4233 NULL))
4234 pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
4237 * Get the result, and check for success.
4239 * We don't use a PG_TRY block here, so be careful not to throw error
4240 * without releasing the PGresult.
4242 res = pgfdw_get_result(fmstate->conn);
4243 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4244 pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
4245 PQclear(res);
4247 /* This action shows that the prepare has been done. */
4248 fmstate->p_name = p_name;
4252 * convert_prep_stmt_params
4253 * Create array of text strings representing parameter values
4255 * tupleid is ctid to send, or NULL if none
4256 * slot is slot to get remaining parameters from, or NULL if none
4258 * Data is constructed in temp_cxt; caller should reset that after use.
4260 static const char **
4261 convert_prep_stmt_params(PgFdwModifyState *fmstate,
4262 ItemPointer tupleid,
4263 TupleTableSlot **slots,
4264 int numSlots)
4266 const char **p_values;
4267 int i;
4268 int j;
4269 int pindex = 0;
4270 MemoryContext oldcontext;
4272 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
4274 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
4276 /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
4277 Assert(!(tupleid != NULL && numSlots > 1));
4279 /* 1st parameter should be ctid, if it's in use */
4280 if (tupleid != NULL)
4282 Assert(numSlots == 1);
4283 /* don't need set_transmission_modes for TID output */
4284 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
4285 PointerGetDatum(tupleid));
4286 pindex++;
4289 /* get following parameters from slots */
4290 if (slots != NULL && fmstate->target_attrs != NIL)
4292 TupleDesc tupdesc = RelationGetDescr(fmstate->rel);
4293 int nestlevel;
4294 ListCell *lc;
4296 nestlevel = set_transmission_modes();
4298 for (i = 0; i < numSlots; i++)
4300 j = (tupleid != NULL) ? 1 : 0;
4301 foreach(lc, fmstate->target_attrs)
4303 int attnum = lfirst_int(lc);
4304 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
4305 Datum value;
4306 bool isnull;
4308 /* Ignore generated columns; they are set to DEFAULT */
4309 if (attr->attgenerated)
4310 continue;
4311 value = slot_getattr(slots[i], attnum, &isnull);
4312 if (isnull)
4313 p_values[pindex] = NULL;
4314 else
4315 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
4316 value);
4317 pindex++;
4318 j++;
4322 reset_transmission_modes(nestlevel);
4325 Assert(pindex == fmstate->p_nums * numSlots);
4327 MemoryContextSwitchTo(oldcontext);
4329 return p_values;
4333 * store_returning_result
4334 * Store the result of a RETURNING clause
4336 * On error, be sure to release the PGresult on the way out. Callers do not
4337 * have PG_TRY blocks to ensure this happens.
4339 static void
4340 store_returning_result(PgFdwModifyState *fmstate,
4341 TupleTableSlot *slot, PGresult *res)
4343 PG_TRY();
4345 HeapTuple newtup;
4347 newtup = make_tuple_from_result_row(res, 0,
4348 fmstate->rel,
4349 fmstate->attinmeta,
4350 fmstate->retrieved_attrs,
4351 NULL,
4352 fmstate->temp_cxt);
4355 * The returning slot will not necessarily be suitable to store
4356 * heaptuples directly, so allow for conversion.
4358 ExecForceStoreHeapTuple(newtup, slot, true);
4360 PG_CATCH();
4362 PQclear(res);
4363 PG_RE_THROW();
4365 PG_END_TRY();
4369 * finish_foreign_modify
4370 * Release resources for a foreign insert/update/delete operation
4372 static void
4373 finish_foreign_modify(PgFdwModifyState *fmstate)
4375 Assert(fmstate != NULL);
4377 /* If we created a prepared statement, destroy it */
4378 deallocate_query(fmstate);
4380 /* Release remote connection */
4381 ReleaseConnection(fmstate->conn);
4382 fmstate->conn = NULL;
4386 * deallocate_query
4387 * Deallocate a prepared statement for a foreign insert/update/delete
4388 * operation
4390 static void
4391 deallocate_query(PgFdwModifyState *fmstate)
4393 char sql[64];
4394 PGresult *res;
4396 /* do nothing if the query is not allocated */
4397 if (!fmstate->p_name)
4398 return;
4400 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
4403 * We don't use a PG_TRY block here, so be careful not to throw error
4404 * without releasing the PGresult.
4406 res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
4407 if (PQresultStatus(res) != PGRES_COMMAND_OK)
4408 pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
4409 PQclear(res);
4410 pfree(fmstate->p_name);
4411 fmstate->p_name = NULL;
4415 * build_remote_returning
4416 * Build a RETURNING targetlist of a remote query for performing an
4417 * UPDATE/DELETE .. RETURNING on a join directly
4419 static List *
4420 build_remote_returning(Index rtindex, Relation rel, List *returningList)
4422 bool have_wholerow = false;
4423 List *tlist = NIL;
4424 List *vars;
4425 ListCell *lc;
4427 Assert(returningList);
4429 vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
4432 * If there's a whole-row reference to the target relation, then we'll
4433 * need all the columns of the relation.
4435 foreach(lc, vars)
4437 Var *var = (Var *) lfirst(lc);
4439 if (IsA(var, Var) &&
4440 var->varno == rtindex &&
4441 var->varattno == InvalidAttrNumber)
4443 have_wholerow = true;
4444 break;
4448 if (have_wholerow)
4450 TupleDesc tupdesc = RelationGetDescr(rel);
4451 int i;
4453 for (i = 1; i <= tupdesc->natts; i++)
4455 Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
4456 Var *var;
4458 /* Ignore dropped attributes. */
4459 if (attr->attisdropped)
4460 continue;
4462 var = makeVar(rtindex,
4464 attr->atttypid,
4465 attr->atttypmod,
4466 attr->attcollation,
4469 tlist = lappend(tlist,
4470 makeTargetEntry((Expr *) var,
4471 list_length(tlist) + 1,
4472 NULL,
4473 false));
4477 /* Now add any remaining columns to tlist. */
4478 foreach(lc, vars)
4480 Var *var = (Var *) lfirst(lc);
4483 * No need for whole-row references to the target relation. We don't
4484 * need system columns other than ctid and oid either, since those are
4485 * set locally.
4487 if (IsA(var, Var) &&
4488 var->varno == rtindex &&
4489 var->varattno <= InvalidAttrNumber &&
4490 var->varattno != SelfItemPointerAttributeNumber)
4491 continue; /* don't need it */
4493 if (tlist_member((Expr *) var, tlist))
4494 continue; /* already got it */
4496 tlist = lappend(tlist,
4497 makeTargetEntry((Expr *) var,
4498 list_length(tlist) + 1,
4499 NULL,
4500 false));
4503 list_free(vars);
4505 return tlist;
4509 * rebuild_fdw_scan_tlist
4510 * Build new fdw_scan_tlist of given foreign-scan plan node from given
4511 * tlist
4513 * There might be columns that the fdw_scan_tlist of the given foreign-scan
4514 * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
4515 * have contained resjunk columns such as 'ctid' of the target relation and
4516 * 'wholerow' of non-target relations, but the tlist might not contain them,
4517 * for example. So, adjust the tlist so it contains all the columns specified
4518 * in the fdw_scan_tlist; else setrefs.c will get confused.
4520 static void
4521 rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
4523 List *new_tlist = tlist;
4524 List *old_tlist = fscan->fdw_scan_tlist;
4525 ListCell *lc;
4527 foreach(lc, old_tlist)
4529 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4531 if (tlist_member(tle->expr, new_tlist))
4532 continue; /* already got it */
4534 new_tlist = lappend(new_tlist,
4535 makeTargetEntry(tle->expr,
4536 list_length(new_tlist) + 1,
4537 NULL,
4538 false));
4540 fscan->fdw_scan_tlist = new_tlist;
4544 * Execute a direct UPDATE/DELETE statement.
4546 static void
4547 execute_dml_stmt(ForeignScanState *node)
4549 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4550 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4551 int numParams = dmstate->numParams;
4552 const char **values = dmstate->param_values;
4554 /* First, process a pending asynchronous request, if any. */
4555 if (dmstate->conn_state->pendingAreq)
4556 process_pending_request(dmstate->conn_state->pendingAreq);
4559 * Construct array of query parameter values in text format.
4561 if (numParams > 0)
4562 process_query_params(econtext,
4563 dmstate->param_flinfo,
4564 dmstate->param_exprs,
4565 values);
4568 * Notice that we pass NULL for paramTypes, thus forcing the remote server
4569 * to infer types for all parameters. Since we explicitly cast every
4570 * parameter (see deparse.c), the "inference" is trivial and will produce
4571 * the desired result. This allows us to avoid assuming that the remote
4572 * server has the same OIDs we do for the parameters' types.
4574 if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
4575 NULL, values, NULL, NULL, 0))
4576 pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
4579 * Get the result, and check for success.
4581 * We don't use a PG_TRY block here, so be careful not to throw error
4582 * without releasing the PGresult.
4584 dmstate->result = pgfdw_get_result(dmstate->conn);
4585 if (PQresultStatus(dmstate->result) !=
4586 (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
4587 pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
4588 dmstate->query);
4590 /* Get the number of rows affected. */
4591 if (dmstate->has_returning)
4592 dmstate->num_tuples = PQntuples(dmstate->result);
4593 else
4594 dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
4598 * Get the result of a RETURNING clause.
4600 static TupleTableSlot *
4601 get_returning_data(ForeignScanState *node)
4603 PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
4604 EState *estate = node->ss.ps.state;
4605 ResultRelInfo *resultRelInfo = node->resultRelInfo;
4606 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
4607 TupleTableSlot *resultSlot;
4609 Assert(resultRelInfo->ri_projectReturning);
4611 /* If we didn't get any tuples, must be end of data. */
4612 if (dmstate->next_tuple >= dmstate->num_tuples)
4613 return ExecClearTuple(slot);
4615 /* Increment the command es_processed count if necessary. */
4616 if (dmstate->set_processed)
4617 estate->es_processed += 1;
4620 * Store a RETURNING tuple. If has_returning is false, just emit a dummy
4621 * tuple. (has_returning is false when the local query is of the form
4622 * "UPDATE/DELETE .. RETURNING 1" for example.)
4624 if (!dmstate->has_returning)
4626 ExecStoreAllNullTuple(slot);
4627 resultSlot = slot;
4629 else
4632 * On error, be sure to release the PGresult on the way out. Callers
4633 * do not have PG_TRY blocks to ensure this happens.
4635 PG_TRY();
4637 HeapTuple newtup;
4639 newtup = make_tuple_from_result_row(dmstate->result,
4640 dmstate->next_tuple,
4641 dmstate->rel,
4642 dmstate->attinmeta,
4643 dmstate->retrieved_attrs,
4644 node,
4645 dmstate->temp_cxt);
4646 ExecStoreHeapTuple(newtup, slot, false);
4648 PG_CATCH();
4650 PQclear(dmstate->result);
4651 PG_RE_THROW();
4653 PG_END_TRY();
4655 /* Get the updated/deleted tuple. */
4656 if (dmstate->rel)
4657 resultSlot = slot;
4658 else
4659 resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate);
4661 dmstate->next_tuple++;
4663 /* Make slot available for evaluation of the local query RETURNING list. */
4664 resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
4665 resultSlot;
4667 return slot;
4671 * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
4673 static void
4674 init_returning_filter(PgFdwDirectModifyState *dmstate,
4675 List *fdw_scan_tlist,
4676 Index rtindex)
4678 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4679 ListCell *lc;
4680 int i;
4683 * Calculate the mapping between the fdw_scan_tlist's entries and the
4684 * result tuple's attributes.
4686 * The "map" is an array of indexes of the result tuple's attributes in
4687 * fdw_scan_tlist, i.e., one entry for every attribute of the result
4688 * tuple. We store zero for any attributes that don't have the
4689 * corresponding entries in that list, marking that a NULL is needed in
4690 * the result tuple.
4692 * Also get the indexes of the entries for ctid and oid if any.
4694 dmstate->attnoMap = (AttrNumber *)
4695 palloc0(resultTupType->natts * sizeof(AttrNumber));
4697 dmstate->ctidAttno = dmstate->oidAttno = 0;
4699 i = 1;
4700 dmstate->hasSystemCols = false;
4701 foreach(lc, fdw_scan_tlist)
4703 TargetEntry *tle = (TargetEntry *) lfirst(lc);
4704 Var *var = (Var *) tle->expr;
4706 Assert(IsA(var, Var));
4709 * If the Var is a column of the target relation to be retrieved from
4710 * the foreign server, get the index of the entry.
4712 if (var->varno == rtindex &&
4713 list_member_int(dmstate->retrieved_attrs, i))
4715 int attrno = var->varattno;
4717 if (attrno < 0)
4720 * We don't retrieve system columns other than ctid and oid.
4722 if (attrno == SelfItemPointerAttributeNumber)
4723 dmstate->ctidAttno = i;
4724 else
4725 Assert(false);
4726 dmstate->hasSystemCols = true;
4728 else
4731 * We don't retrieve whole-row references to the target
4732 * relation either.
4734 Assert(attrno > 0);
4736 dmstate->attnoMap[attrno - 1] = i;
4739 i++;
4744 * Extract and return an updated/deleted tuple from a scan tuple.
4746 static TupleTableSlot *
4747 apply_returning_filter(PgFdwDirectModifyState *dmstate,
4748 ResultRelInfo *resultRelInfo,
4749 TupleTableSlot *slot,
4750 EState *estate)
4752 TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
4753 TupleTableSlot *resultSlot;
4754 Datum *values;
4755 bool *isnull;
4756 Datum *old_values;
4757 bool *old_isnull;
4758 int i;
4761 * Use the return tuple slot as a place to store the result tuple.
4763 resultSlot = ExecGetReturningSlot(estate, resultRelInfo);
4766 * Extract all the values of the scan tuple.
4768 slot_getallattrs(slot);
4769 old_values = slot->tts_values;
4770 old_isnull = slot->tts_isnull;
4773 * Prepare to build the result tuple.
4775 ExecClearTuple(resultSlot);
4776 values = resultSlot->tts_values;
4777 isnull = resultSlot->tts_isnull;
4780 * Transpose data into proper fields of the result tuple.
4782 for (i = 0; i < resultTupType->natts; i++)
4784 int j = dmstate->attnoMap[i];
4786 if (j == 0)
4788 values[i] = (Datum) 0;
4789 isnull[i] = true;
4791 else
4793 values[i] = old_values[j - 1];
4794 isnull[i] = old_isnull[j - 1];
4799 * Build the virtual tuple.
4801 ExecStoreVirtualTuple(resultSlot);
4804 * If we have any system columns to return, materialize a heap tuple in
4805 * the slot from column values set above and install system columns in
4806 * that tuple.
4808 if (dmstate->hasSystemCols)
4810 HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL);
4812 /* ctid */
4813 if (dmstate->ctidAttno)
4815 ItemPointer ctid = NULL;
4817 ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
4818 resultTup->t_self = *ctid;
4822 * And remaining columns
4824 * Note: since we currently don't allow the target relation to appear
4825 * on the nullable side of an outer join, any system columns wouldn't
4826 * go to NULL.
4828 * Note: no need to care about tableoid here because it will be
4829 * initialized in ExecProcessReturning().
4831 HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
4832 HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
4833 HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
4837 * And return the result tuple.
4839 return resultSlot;
4843 * Prepare for processing of parameters used in remote query.
4845 static void
4846 prepare_query_params(PlanState *node,
4847 List *fdw_exprs,
4848 int numParams,
4849 FmgrInfo **param_flinfo,
4850 List **param_exprs,
4851 const char ***param_values)
4853 int i;
4854 ListCell *lc;
4856 Assert(numParams > 0);
4858 /* Prepare for output conversion of parameters used in remote query. */
4859 *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
4861 i = 0;
4862 foreach(lc, fdw_exprs)
4864 Node *param_expr = (Node *) lfirst(lc);
4865 Oid typefnoid;
4866 bool isvarlena;
4868 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
4869 fmgr_info(typefnoid, &(*param_flinfo)[i]);
4870 i++;
4874 * Prepare remote-parameter expressions for evaluation. (Note: in
4875 * practice, we expect that all these expressions will be just Params, so
4876 * we could possibly do something more efficient than using the full
4877 * expression-eval machinery for this. But probably there would be little
4878 * benefit, and it'd require postgres_fdw to know more than is desirable
4879 * about Param evaluation.)
4881 *param_exprs = ExecInitExprList(fdw_exprs, node);
4883 /* Allocate buffer for text form of query parameters. */
4884 *param_values = (const char **) palloc0(numParams * sizeof(char *));
4888 * Construct array of query parameter values in text format.
4890 static void
4891 process_query_params(ExprContext *econtext,
4892 FmgrInfo *param_flinfo,
4893 List *param_exprs,
4894 const char **param_values)
4896 int nestlevel;
4897 int i;
4898 ListCell *lc;
4900 nestlevel = set_transmission_modes();
4902 i = 0;
4903 foreach(lc, param_exprs)
4905 ExprState *expr_state = (ExprState *) lfirst(lc);
4906 Datum expr_value;
4907 bool isNull;
4909 /* Evaluate the parameter expression */
4910 expr_value = ExecEvalExpr(expr_state, econtext, &isNull);
4913 * Get string representation of each parameter value by invoking
4914 * type-specific output function, unless the value is null.
4916 if (isNull)
4917 param_values[i] = NULL;
4918 else
4919 param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
4921 i++;
4924 reset_transmission_modes(nestlevel);
4928 * postgresAnalyzeForeignTable
4929 * Test whether analyzing this foreign table is supported
4931 static bool
4932 postgresAnalyzeForeignTable(Relation relation,
4933 AcquireSampleRowsFunc *func,
4934 BlockNumber *totalpages)
4936 ForeignTable *table;
4937 UserMapping *user;
4938 PGconn *conn;
4939 StringInfoData sql;
4940 PGresult *volatile res = NULL;
4942 /* Return the row-analysis function pointer */
4943 *func = postgresAcquireSampleRowsFunc;
4946 * Now we have to get the number of pages. It's annoying that the ANALYZE
4947 * API requires us to return that now, because it forces some duplication
4948 * of effort between this routine and postgresAcquireSampleRowsFunc. But
4949 * it's probably not worth redefining that API at this point.
4953 * Get the connection to use. We do the remote access as the table's
4954 * owner, even if the ANALYZE was started by some other user.
4956 table = GetForeignTable(RelationGetRelid(relation));
4957 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
4958 conn = GetConnection(user, false, NULL);
4961 * Construct command to get page count for relation.
4963 initStringInfo(&sql);
4964 deparseAnalyzeSizeSql(&sql, relation);
4966 /* In what follows, do not risk leaking any PGresults. */
4967 PG_TRY();
4969 res = pgfdw_exec_query(conn, sql.data, NULL);
4970 if (PQresultStatus(res) != PGRES_TUPLES_OK)
4971 pgfdw_report_error(ERROR, res, conn, false, sql.data);
4973 if (PQntuples(res) != 1 || PQnfields(res) != 1)
4974 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
4975 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
4977 PG_FINALLY();
4979 PQclear(res);
4981 PG_END_TRY();
4983 ReleaseConnection(conn);
4985 return true;
4989 * postgresGetAnalyzeInfoForForeignTable
4990 * Count tuples in foreign table (just get pg_class.reltuples).
4992 * can_tablesample determines if the remote relation supports acquiring the
4993 * sample using TABLESAMPLE.
4995 static double
4996 postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
4998 ForeignTable *table;
4999 UserMapping *user;
5000 PGconn *conn;
5001 StringInfoData sql;
5002 PGresult *volatile res = NULL;
5003 volatile double reltuples = -1;
5004 volatile char relkind = 0;
5006 /* assume the remote relation does not support TABLESAMPLE */
5007 *can_tablesample = false;
5010 * Get the connection to use. We do the remote access as the table's
5011 * owner, even if the ANALYZE was started by some other user.
5013 table = GetForeignTable(RelationGetRelid(relation));
5014 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5015 conn = GetConnection(user, false, NULL);
5018 * Construct command to get page count for relation.
5020 initStringInfo(&sql);
5021 deparseAnalyzeInfoSql(&sql, relation);
5023 /* In what follows, do not risk leaking any PGresults. */
5024 PG_TRY();
5026 res = pgfdw_exec_query(conn, sql.data, NULL);
5027 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5028 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5030 if (PQntuples(res) != 1 || PQnfields(res) != 2)
5031 elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
5032 reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
5033 relkind = *(PQgetvalue(res, 0, 1));
5035 PG_FINALLY();
5037 if (res)
5038 PQclear(res);
5040 PG_END_TRY();
5042 ReleaseConnection(conn);
5044 /* TABLESAMPLE is supported only for regular tables and matviews */
5045 *can_tablesample = (relkind == RELKIND_RELATION ||
5046 relkind == RELKIND_MATVIEW ||
5047 relkind == RELKIND_PARTITIONED_TABLE);
5049 return reltuples;
5053 * Acquire a random sample of rows from foreign table managed by postgres_fdw.
5055 * Selected rows are returned in the caller-allocated array rows[],
5056 * which must have at least targrows entries.
5057 * The actual number of rows selected is returned as the function result.
5058 * We also count the total number of rows in the table and return it into
5059 * *totalrows. Note that *totaldeadrows is always set to 0.
5061 * Note that the returned list of rows is not always in order by physical
5062 * position in the table. Therefore, correlation estimates derived later
5063 * may be meaningless, but it's OK because we don't use the estimates
5064 * currently (the planner only pays attention to correlation for indexscans).
5066 static int
5067 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
5068 HeapTuple *rows, int targrows,
5069 double *totalrows,
5070 double *totaldeadrows)
5072 PgFdwAnalyzeState astate;
5073 ForeignTable *table;
5074 ForeignServer *server;
5075 UserMapping *user;
5076 PGconn *conn;
5077 int server_version_num;
5078 PgFdwSamplingMethod method = ANALYZE_SAMPLE_AUTO; /* auto is default */
5079 double sample_frac = -1.0;
5080 double reltuples;
5081 unsigned int cursor_number;
5082 StringInfoData sql;
5083 PGresult *volatile res = NULL;
5084 ListCell *lc;
5086 /* Initialize workspace state */
5087 astate.rel = relation;
5088 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
5090 astate.rows = rows;
5091 astate.targrows = targrows;
5092 astate.numrows = 0;
5093 astate.samplerows = 0;
5094 astate.rowstoskip = -1; /* -1 means not set yet */
5095 reservoir_init_selection_state(&astate.rstate, targrows);
5097 /* Remember ANALYZE context, and create a per-tuple temp context */
5098 astate.anl_cxt = CurrentMemoryContext;
5099 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
5100 "postgres_fdw temporary data",
5101 ALLOCSET_SMALL_SIZES);
5104 * Get the connection to use. We do the remote access as the table's
5105 * owner, even if the ANALYZE was started by some other user.
5107 table = GetForeignTable(RelationGetRelid(relation));
5108 server = GetForeignServer(table->serverid);
5109 user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
5110 conn = GetConnection(user, false, NULL);
5112 /* We'll need server version, so fetch it now. */
5113 server_version_num = PQserverVersion(conn);
5116 * What sampling method should we use?
5118 foreach(lc, server->options)
5120 DefElem *def = (DefElem *) lfirst(lc);
5122 if (strcmp(def->defname, "analyze_sampling") == 0)
5124 char *value = defGetString(def);
5126 if (strcmp(value, "off") == 0)
5127 method = ANALYZE_SAMPLE_OFF;
5128 else if (strcmp(value, "auto") == 0)
5129 method = ANALYZE_SAMPLE_AUTO;
5130 else if (strcmp(value, "random") == 0)
5131 method = ANALYZE_SAMPLE_RANDOM;
5132 else if (strcmp(value, "system") == 0)
5133 method = ANALYZE_SAMPLE_SYSTEM;
5134 else if (strcmp(value, "bernoulli") == 0)
5135 method = ANALYZE_SAMPLE_BERNOULLI;
5137 break;
5141 foreach(lc, table->options)
5143 DefElem *def = (DefElem *) lfirst(lc);
5145 if (strcmp(def->defname, "analyze_sampling") == 0)
5147 char *value = defGetString(def);
5149 if (strcmp(value, "off") == 0)
5150 method = ANALYZE_SAMPLE_OFF;
5151 else if (strcmp(value, "auto") == 0)
5152 method = ANALYZE_SAMPLE_AUTO;
5153 else if (strcmp(value, "random") == 0)
5154 method = ANALYZE_SAMPLE_RANDOM;
5155 else if (strcmp(value, "system") == 0)
5156 method = ANALYZE_SAMPLE_SYSTEM;
5157 else if (strcmp(value, "bernoulli") == 0)
5158 method = ANALYZE_SAMPLE_BERNOULLI;
5160 break;
5165 * Error-out if explicitly required one of the TABLESAMPLE methods, but
5166 * the server does not support it.
5168 if ((server_version_num < 95000) &&
5169 (method == ANALYZE_SAMPLE_SYSTEM ||
5170 method == ANALYZE_SAMPLE_BERNOULLI))
5171 ereport(ERROR,
5172 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5173 errmsg("remote server does not support TABLESAMPLE feature")));
5176 * If we've decided to do remote sampling, calculate the sampling rate. We
5177 * need to get the number of tuples from the remote server, but skip that
5178 * network round-trip if not needed.
5180 if (method != ANALYZE_SAMPLE_OFF)
5182 bool can_tablesample;
5184 reltuples = postgresGetAnalyzeInfoForForeignTable(relation,
5185 &can_tablesample);
5188 * Make sure we're not choosing TABLESAMPLE when the remote relation
5189 * does not support that. But only do this for "auto" - if the user
5190 * explicitly requested BERNOULLI/SYSTEM, it's better to fail.
5192 if (!can_tablesample && (method == ANALYZE_SAMPLE_AUTO))
5193 method = ANALYZE_SAMPLE_RANDOM;
5196 * Remote's reltuples could be 0 or -1 if the table has never been
5197 * vacuumed/analyzed. In that case, disable sampling after all.
5199 if ((reltuples <= 0) || (targrows >= reltuples))
5200 method = ANALYZE_SAMPLE_OFF;
5201 else
5204 * All supported sampling methods require sampling rate, not
5205 * target rows directly, so we calculate that using the remote
5206 * reltuples value. That's imperfect, because it might be off a
5207 * good deal, but that's not something we can (or should) address
5208 * here.
5210 * If reltuples is too low (i.e. when table grew), we'll end up
5211 * sampling more rows - but then we'll apply the local sampling,
5212 * so we get the expected sample size. This is the same outcome as
5213 * without remote sampling.
5215 * If reltuples is too high (e.g. after bulk DELETE), we will end
5216 * up sampling too few rows.
5218 * We can't really do much better here - we could try sampling a
5219 * bit more rows, but we don't know how off the reltuples value is
5220 * so how much is "a bit more"?
5222 * Furthermore, the targrows value for partitions is determined
5223 * based on table size (relpages), which can be off in different
5224 * ways too. Adjusting the sampling rate here might make the issue
5225 * worse.
5227 sample_frac = targrows / reltuples;
5230 * We should never get sampling rate outside the valid range
5231 * (between 0.0 and 1.0), because those cases should be covered by
5232 * the previous branch that sets ANALYZE_SAMPLE_OFF.
5234 Assert(sample_frac >= 0.0 && sample_frac <= 1.0);
5239 * For "auto" method, pick the one we believe is best. For servers with
5240 * TABLESAMPLE support we pick BERNOULLI, for old servers we fall-back to
5241 * random() to at least reduce network transfer.
5243 if (method == ANALYZE_SAMPLE_AUTO)
5245 if (server_version_num < 95000)
5246 method = ANALYZE_SAMPLE_RANDOM;
5247 else
5248 method = ANALYZE_SAMPLE_BERNOULLI;
5252 * Construct cursor that retrieves whole rows from remote.
5254 cursor_number = GetCursorNumber(conn);
5255 initStringInfo(&sql);
5256 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
5258 deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
5260 /* In what follows, do not risk leaking any PGresults. */
5261 PG_TRY();
5263 char fetch_sql[64];
5264 int fetch_size;
5266 res = pgfdw_exec_query(conn, sql.data, NULL);
5267 if (PQresultStatus(res) != PGRES_COMMAND_OK)
5268 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5269 PQclear(res);
5270 res = NULL;
5273 * Determine the fetch size. The default is arbitrary, but shouldn't
5274 * be enormous.
5276 fetch_size = 100;
5277 foreach(lc, server->options)
5279 DefElem *def = (DefElem *) lfirst(lc);
5281 if (strcmp(def->defname, "fetch_size") == 0)
5283 (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5284 break;
5287 foreach(lc, table->options)
5289 DefElem *def = (DefElem *) lfirst(lc);
5291 if (strcmp(def->defname, "fetch_size") == 0)
5293 (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
5294 break;
5298 /* Construct command to fetch rows from remote. */
5299 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
5300 fetch_size, cursor_number);
5302 /* Retrieve and process rows a batch at a time. */
5303 for (;;)
5305 int numrows;
5306 int i;
5308 /* Allow users to cancel long query */
5309 CHECK_FOR_INTERRUPTS();
5312 * XXX possible future improvement: if rowstoskip is large, we
5313 * could issue a MOVE rather than physically fetching the rows,
5314 * then just adjust rowstoskip and samplerows appropriately.
5317 /* Fetch some rows */
5318 res = pgfdw_exec_query(conn, fetch_sql, NULL);
5319 /* On error, report the original query, not the FETCH. */
5320 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5321 pgfdw_report_error(ERROR, res, conn, false, sql.data);
5323 /* Process whatever we got. */
5324 numrows = PQntuples(res);
5325 for (i = 0; i < numrows; i++)
5326 analyze_row_processor(res, i, &astate);
5328 PQclear(res);
5329 res = NULL;
5331 /* Must be EOF if we didn't get all the rows requested. */
5332 if (numrows < fetch_size)
5333 break;
5336 /* Close the cursor, just to be tidy. */
5337 close_cursor(conn, cursor_number, NULL);
5339 PG_CATCH();
5341 PQclear(res);
5342 PG_RE_THROW();
5344 PG_END_TRY();
5346 ReleaseConnection(conn);
5348 /* We assume that we have no dead tuple. */
5349 *totaldeadrows = 0.0;
5352 * Without sampling, we've retrieved all living tuples from foreign
5353 * server, so report that as totalrows. Otherwise use the reltuples
5354 * estimate we got from the remote side.
5356 if (method == ANALYZE_SAMPLE_OFF)
5357 *totalrows = astate.samplerows;
5358 else
5359 *totalrows = reltuples;
5362 * Emit some interesting relation info
5364 ereport(elevel,
5365 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
5366 RelationGetRelationName(relation),
5367 *totalrows, astate.numrows)));
5369 return astate.numrows;
5373 * Collect sample rows from the result of query.
5374 * - Use all tuples in sample until target # of samples are collected.
5375 * - Subsequently, replace already-sampled tuples randomly.
5377 static void
5378 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
5380 int targrows = astate->targrows;
5381 int pos; /* array index to store tuple in */
5382 MemoryContext oldcontext;
5384 /* Always increment sample row counter. */
5385 astate->samplerows += 1;
5388 * Determine the slot where this sample row should be stored. Set pos to
5389 * negative value to indicate the row should be skipped.
5391 if (astate->numrows < targrows)
5393 /* First targrows rows are always included into the sample */
5394 pos = astate->numrows++;
5396 else
5399 * Now we start replacing tuples in the sample until we reach the end
5400 * of the relation. Same algorithm as in acquire_sample_rows in
5401 * analyze.c; see Jeff Vitter's paper.
5403 if (astate->rowstoskip < 0)
5404 astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows);
5406 if (astate->rowstoskip <= 0)
5408 /* Choose a random reservoir element to replace. */
5409 pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate));
5410 Assert(pos >= 0 && pos < targrows);
5411 heap_freetuple(astate->rows[pos]);
5413 else
5415 /* Skip this tuple. */
5416 pos = -1;
5419 astate->rowstoskip -= 1;
5422 if (pos >= 0)
5425 * Create sample tuple from current result row, and store it in the
5426 * position determined above. The tuple has to be created in anl_cxt.
5428 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
5430 astate->rows[pos] = make_tuple_from_result_row(res, row,
5431 astate->rel,
5432 astate->attinmeta,
5433 astate->retrieved_attrs,
5434 NULL,
5435 astate->temp_cxt);
5437 MemoryContextSwitchTo(oldcontext);
5442 * Import a foreign schema
5444 static List *
5445 postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
5447 List *commands = NIL;
5448 bool import_collate = true;
5449 bool import_default = false;
5450 bool import_generated = true;
5451 bool import_not_null = true;
5452 ForeignServer *server;
5453 UserMapping *mapping;
5454 PGconn *conn;
5455 StringInfoData buf;
5456 PGresult *volatile res = NULL;
5457 int numrows,
5459 ListCell *lc;
5461 /* Parse statement options */
5462 foreach(lc, stmt->options)
5464 DefElem *def = (DefElem *) lfirst(lc);
5466 if (strcmp(def->defname, "import_collate") == 0)
5467 import_collate = defGetBoolean(def);
5468 else if (strcmp(def->defname, "import_default") == 0)
5469 import_default = defGetBoolean(def);
5470 else if (strcmp(def->defname, "import_generated") == 0)
5471 import_generated = defGetBoolean(def);
5472 else if (strcmp(def->defname, "import_not_null") == 0)
5473 import_not_null = defGetBoolean(def);
5474 else
5475 ereport(ERROR,
5476 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
5477 errmsg("invalid option \"%s\"", def->defname)));
5481 * Get connection to the foreign server. Connection manager will
5482 * establish new connection if necessary.
5484 server = GetForeignServer(serverOid);
5485 mapping = GetUserMapping(GetUserId(), server->serverid);
5486 conn = GetConnection(mapping, false, NULL);
5488 /* Don't attempt to import collation if remote server hasn't got it */
5489 if (PQserverVersion(conn) < 90100)
5490 import_collate = false;
5492 /* Create workspace for strings */
5493 initStringInfo(&buf);
5495 /* In what follows, do not risk leaking any PGresults. */
5496 PG_TRY();
5498 /* Check that the schema really exists */
5499 appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
5500 deparseStringLiteral(&buf, stmt->remote_schema);
5502 res = pgfdw_exec_query(conn, buf.data, NULL);
5503 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5504 pgfdw_report_error(ERROR, res, conn, false, buf.data);
5506 if (PQntuples(res) != 1)
5507 ereport(ERROR,
5508 (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
5509 errmsg("schema \"%s\" is not present on foreign server \"%s\"",
5510 stmt->remote_schema, server->servername)));
5512 PQclear(res);
5513 res = NULL;
5514 resetStringInfo(&buf);
5517 * Fetch all table data from this schema, possibly restricted by
5518 * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
5519 * to EXCEPT/LIMIT TO here, because the core code will filter the
5520 * statements we return according to those lists anyway. But it
5521 * should save a few cycles to not process excluded tables in the
5522 * first place.)
5524 * Import table data for partitions only when they are explicitly
5525 * specified in LIMIT TO clause. Otherwise ignore them and only
5526 * include the definitions of the root partitioned tables to allow
5527 * access to the complete remote data set locally in the schema
5528 * imported.
5530 * Note: because we run the connection with search_path restricted to
5531 * pg_catalog, the format_type() and pg_get_expr() outputs will always
5532 * include a schema name for types/functions in other schemas, which
5533 * is what we want.
5535 appendStringInfoString(&buf,
5536 "SELECT relname, "
5537 " attname, "
5538 " format_type(atttypid, atttypmod), "
5539 " attnotnull, "
5540 " pg_get_expr(adbin, adrelid), ");
5542 /* Generated columns are supported since Postgres 12 */
5543 if (PQserverVersion(conn) >= 120000)
5544 appendStringInfoString(&buf,
5545 " attgenerated, ");
5546 else
5547 appendStringInfoString(&buf,
5548 " NULL, ");
5550 if (import_collate)
5551 appendStringInfoString(&buf,
5552 " collname, "
5553 " collnsp.nspname ");
5554 else
5555 appendStringInfoString(&buf,
5556 " NULL, NULL ");
5558 appendStringInfoString(&buf,
5559 "FROM pg_class c "
5560 " JOIN pg_namespace n ON "
5561 " relnamespace = n.oid "
5562 " LEFT JOIN pg_attribute a ON "
5563 " attrelid = c.oid AND attnum > 0 "
5564 " AND NOT attisdropped "
5565 " LEFT JOIN pg_attrdef ad ON "
5566 " adrelid = c.oid AND adnum = attnum ");
5568 if (import_collate)
5569 appendStringInfoString(&buf,
5570 " LEFT JOIN pg_collation coll ON "
5571 " coll.oid = attcollation "
5572 " LEFT JOIN pg_namespace collnsp ON "
5573 " collnsp.oid = collnamespace ");
5575 appendStringInfoString(&buf,
5576 "WHERE c.relkind IN ("
5577 CppAsString2(RELKIND_RELATION) ","
5578 CppAsString2(RELKIND_VIEW) ","
5579 CppAsString2(RELKIND_FOREIGN_TABLE) ","
5580 CppAsString2(RELKIND_MATVIEW) ","
5581 CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
5582 " AND n.nspname = ");
5583 deparseStringLiteral(&buf, stmt->remote_schema);
5585 /* Partitions are supported since Postgres 10 */
5586 if (PQserverVersion(conn) >= 100000 &&
5587 stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
5588 appendStringInfoString(&buf, " AND NOT c.relispartition ");
5590 /* Apply restrictions for LIMIT TO and EXCEPT */
5591 if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
5592 stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5594 bool first_item = true;
5596 appendStringInfoString(&buf, " AND c.relname ");
5597 if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
5598 appendStringInfoString(&buf, "NOT ");
5599 appendStringInfoString(&buf, "IN (");
5601 /* Append list of table names within IN clause */
5602 foreach(lc, stmt->table_list)
5604 RangeVar *rv = (RangeVar *) lfirst(lc);
5606 if (first_item)
5607 first_item = false;
5608 else
5609 appendStringInfoString(&buf, ", ");
5610 deparseStringLiteral(&buf, rv->relname);
5612 appendStringInfoChar(&buf, ')');
5615 /* Append ORDER BY at the end of query to ensure output ordering */
5616 appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
5618 /* Fetch the data */
5619 res = pgfdw_exec_query(conn, buf.data, NULL);
5620 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5621 pgfdw_report_error(ERROR, res, conn, false, buf.data);
5623 /* Process results */
5624 numrows = PQntuples(res);
5625 /* note: incrementation of i happens in inner loop's while() test */
5626 for (i = 0; i < numrows;)
5628 char *tablename = PQgetvalue(res, i, 0);
5629 bool first_item = true;
5631 resetStringInfo(&buf);
5632 appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
5633 quote_identifier(tablename));
5635 /* Scan all rows for this table */
5638 char *attname;
5639 char *typename;
5640 char *attnotnull;
5641 char *attgenerated;
5642 char *attdefault;
5643 char *collname;
5644 char *collnamespace;
5646 /* If table has no columns, we'll see nulls here */
5647 if (PQgetisnull(res, i, 1))
5648 continue;
5650 attname = PQgetvalue(res, i, 1);
5651 typename = PQgetvalue(res, i, 2);
5652 attnotnull = PQgetvalue(res, i, 3);
5653 attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
5654 PQgetvalue(res, i, 4);
5655 attgenerated = PQgetisnull(res, i, 5) ? (char *) NULL :
5656 PQgetvalue(res, i, 5);
5657 collname = PQgetisnull(res, i, 6) ? (char *) NULL :
5658 PQgetvalue(res, i, 6);
5659 collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL :
5660 PQgetvalue(res, i, 7);
5662 if (first_item)
5663 first_item = false;
5664 else
5665 appendStringInfoString(&buf, ",\n");
5667 /* Print column name and type */
5668 appendStringInfo(&buf, " %s %s",
5669 quote_identifier(attname),
5670 typename);
5673 * Add column_name option so that renaming the foreign table's
5674 * column doesn't break the association to the underlying
5675 * column.
5677 appendStringInfoString(&buf, " OPTIONS (column_name ");
5678 deparseStringLiteral(&buf, attname);
5679 appendStringInfoChar(&buf, ')');
5681 /* Add COLLATE if needed */
5682 if (import_collate && collname != NULL && collnamespace != NULL)
5683 appendStringInfo(&buf, " COLLATE %s.%s",
5684 quote_identifier(collnamespace),
5685 quote_identifier(collname));
5687 /* Add DEFAULT if needed */
5688 if (import_default && attdefault != NULL &&
5689 (!attgenerated || !attgenerated[0]))
5690 appendStringInfo(&buf, " DEFAULT %s", attdefault);
5692 /* Add GENERATED if needed */
5693 if (import_generated && attgenerated != NULL &&
5694 attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
5696 Assert(attdefault != NULL);
5697 appendStringInfo(&buf,
5698 " GENERATED ALWAYS AS (%s) STORED",
5699 attdefault);
5702 /* Add NOT NULL if needed */
5703 if (import_not_null && attnotnull[0] == 't')
5704 appendStringInfoString(&buf, " NOT NULL");
5706 while (++i < numrows &&
5707 strcmp(PQgetvalue(res, i, 0), tablename) == 0);
5710 * Add server name and table-level options. We specify remote
5711 * schema and table name as options (the latter to ensure that
5712 * renaming the foreign table doesn't break the association).
5714 appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
5715 quote_identifier(server->servername));
5717 appendStringInfoString(&buf, "schema_name ");
5718 deparseStringLiteral(&buf, stmt->remote_schema);
5719 appendStringInfoString(&buf, ", table_name ");
5720 deparseStringLiteral(&buf, tablename);
5722 appendStringInfoString(&buf, ");");
5724 commands = lappend(commands, pstrdup(buf.data));
5727 PG_FINALLY();
5729 PQclear(res);
5731 PG_END_TRY();
5733 ReleaseConnection(conn);
5735 return commands;
5739 * Check if reltarget is safe enough to push down semi-join. Reltarget is not
5740 * safe, if it contains references to inner rel relids, which do not belong to
5741 * outer rel.
5743 static bool
5744 semijoin_target_ok(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel, RelOptInfo *innerrel)
5746 List *vars;
5747 ListCell *lc;
5748 bool ok = true;
5750 Assert(joinrel->reltarget);
5752 vars = pull_var_clause((Node *) joinrel->reltarget->exprs, PVC_INCLUDE_PLACEHOLDERS);
5754 foreach(lc, vars)
5756 Var *var = (Var *) lfirst(lc);
5758 if (!IsA(var, Var))
5759 continue;
5761 if (bms_is_member(var->varno, innerrel->relids) &&
5762 !bms_is_member(var->varno, outerrel->relids))
5765 * The planner can create semi-join, which refers to inner rel
5766 * vars in its target list. However, we deparse semi-join as an
5767 * exists() subquery, so can't handle references to inner rel in
5768 * the target list.
5770 ok = false;
5771 break;
5774 return ok;
5778 * Assess whether the join between inner and outer relations can be pushed down
5779 * to the foreign server. As a side effect, save information we obtain in this
5780 * function to PgFdwRelationInfo passed in.
5782 static bool
5783 foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
5784 RelOptInfo *outerrel, RelOptInfo *innerrel,
5785 JoinPathExtraData *extra)
5787 PgFdwRelationInfo *fpinfo;
5788 PgFdwRelationInfo *fpinfo_o;
5789 PgFdwRelationInfo *fpinfo_i;
5790 ListCell *lc;
5791 List *joinclauses;
5794 * We support pushing down INNER, LEFT, RIGHT, FULL OUTER and SEMI joins.
5795 * Constructing queries representing ANTI joins is hard, hence not
5796 * considered right now.
5798 if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
5799 jointype != JOIN_RIGHT && jointype != JOIN_FULL &&
5800 jointype != JOIN_SEMI)
5801 return false;
5804 * We can't push down semi-join if its reltarget is not safe
5806 if ((jointype == JOIN_SEMI) && !semijoin_target_ok(root, joinrel, outerrel, innerrel))
5807 return false;
5810 * If either of the joining relations is marked as unsafe to pushdown, the
5811 * join can not be pushed down.
5813 fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
5814 fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
5815 fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
5816 if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
5817 !fpinfo_i || !fpinfo_i->pushdown_safe)
5818 return false;
5821 * If joining relations have local conditions, those conditions are
5822 * required to be applied before joining the relations. Hence the join can
5823 * not be pushed down.
5825 if (fpinfo_o->local_conds || fpinfo_i->local_conds)
5826 return false;
5829 * Merge FDW options. We might be tempted to do this after we have deemed
5830 * the foreign join to be OK. But we must do this beforehand so that we
5831 * know which quals can be evaluated on the foreign server, which might
5832 * depend on shippable_extensions.
5834 fpinfo->server = fpinfo_o->server;
5835 merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i);
5838 * Separate restrict list into join quals and pushed-down (other) quals.
5840 * Join quals belonging to an outer join must all be shippable, else we
5841 * cannot execute the join remotely. Add such quals to 'joinclauses'.
5843 * Add other quals to fpinfo->remote_conds if they are shippable, else to
5844 * fpinfo->local_conds. In an inner join it's okay to execute conditions
5845 * either locally or remotely; the same is true for pushed-down conditions
5846 * at an outer join.
5848 * Note we might return failure after having already scribbled on
5849 * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
5850 * won't consult those lists again if we deem the join unshippable.
5852 joinclauses = NIL;
5853 foreach(lc, extra->restrictlist)
5855 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
5856 bool is_remote_clause = is_foreign_expr(root, joinrel,
5857 rinfo->clause);
5859 if (IS_OUTER_JOIN(jointype) &&
5860 !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
5862 if (!is_remote_clause)
5863 return false;
5864 joinclauses = lappend(joinclauses, rinfo);
5866 else
5868 if (is_remote_clause)
5869 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
5870 else
5871 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
5876 * deparseExplicitTargetList() isn't smart enough to handle anything other
5877 * than a Var. In particular, if there's some PlaceHolderVar that would
5878 * need to be evaluated within this join tree (because there's an upper
5879 * reference to a quantity that may go to NULL as a result of an outer
5880 * join), then we can't try to push the join down because we'll fail when
5881 * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
5882 * needs to be evaluated *at the top* of this join tree is OK, because we
5883 * can do that locally after fetching the results from the remote side.
5885 foreach(lc, root->placeholder_list)
5887 PlaceHolderInfo *phinfo = lfirst(lc);
5888 Relids relids;
5890 /* PlaceHolderInfo refers to parent relids, not child relids. */
5891 relids = IS_OTHER_REL(joinrel) ?
5892 joinrel->top_parent_relids : joinrel->relids;
5894 if (bms_is_subset(phinfo->ph_eval_at, relids) &&
5895 bms_nonempty_difference(relids, phinfo->ph_eval_at))
5896 return false;
5899 /* Save the join clauses, for later use. */
5900 fpinfo->joinclauses = joinclauses;
5902 fpinfo->outerrel = outerrel;
5903 fpinfo->innerrel = innerrel;
5904 fpinfo->jointype = jointype;
5907 * By default, both the input relations are not required to be deparsed as
5908 * subqueries, but there might be some relations covered by the input
5909 * relations that are required to be deparsed as subqueries, so save the
5910 * relids of those relations for later use by the deparser.
5912 fpinfo->make_outerrel_subquery = false;
5913 fpinfo->make_innerrel_subquery = false;
5914 Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
5915 Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
5916 fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels,
5917 fpinfo_i->lower_subquery_rels);
5918 fpinfo->hidden_subquery_rels = bms_union(fpinfo_o->hidden_subquery_rels,
5919 fpinfo_i->hidden_subquery_rels);
5922 * Pull the other remote conditions from the joining relations into join
5923 * clauses or other remote clauses (remote_conds) of this relation
5924 * wherever possible. This avoids building subqueries at every join step.
5926 * For an inner join, clauses from both the relations are added to the
5927 * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
5928 * the outer side are added to remote_conds since those can be evaluated
5929 * after the join is evaluated. The clauses from inner side are added to
5930 * the joinclauses, since they need to be evaluated while constructing the
5931 * join.
5933 * For SEMI-JOIN clauses from inner relation can not be added to
5934 * remote_conds, but should be treated as join clauses (as they are
5935 * deparsed to EXISTS subquery, where inner relation can be referred). A
5936 * list of relation ids, which can't be referred to from higher levels, is
5937 * preserved as a hidden_subquery_rels list.
5939 * For a FULL OUTER JOIN, the other clauses from either relation can not
5940 * be added to the joinclauses or remote_conds, since each relation acts
5941 * as an outer relation for the other.
5943 * The joining sides can not have local conditions, thus no need to test
5944 * shippability of the clauses being pulled up.
5946 switch (jointype)
5948 case JOIN_INNER:
5949 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5950 fpinfo_i->remote_conds);
5951 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5952 fpinfo_o->remote_conds);
5953 break;
5955 case JOIN_LEFT:
5956 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5957 fpinfo_i->remote_conds);
5958 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5959 fpinfo_o->remote_conds);
5960 break;
5962 case JOIN_RIGHT:
5963 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5964 fpinfo_o->remote_conds);
5965 fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
5966 fpinfo_i->remote_conds);
5967 break;
5969 case JOIN_SEMI:
5970 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5971 fpinfo_i->remote_conds);
5972 fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
5973 fpinfo->remote_conds);
5974 fpinfo->remote_conds = list_copy(fpinfo_o->remote_conds);
5975 fpinfo->hidden_subquery_rels = bms_union(fpinfo->hidden_subquery_rels,
5976 innerrel->relids);
5977 break;
5979 case JOIN_FULL:
5982 * In this case, if any of the input relations has conditions, we
5983 * need to deparse that relation as a subquery so that the
5984 * conditions can be evaluated before the join. Remember it in
5985 * the fpinfo of this relation so that the deparser can take
5986 * appropriate action. Also, save the relids of base relations
5987 * covered by that relation for later use by the deparser.
5989 if (fpinfo_o->remote_conds)
5991 fpinfo->make_outerrel_subquery = true;
5992 fpinfo->lower_subquery_rels =
5993 bms_add_members(fpinfo->lower_subquery_rels,
5994 outerrel->relids);
5996 if (fpinfo_i->remote_conds)
5998 fpinfo->make_innerrel_subquery = true;
5999 fpinfo->lower_subquery_rels =
6000 bms_add_members(fpinfo->lower_subquery_rels,
6001 innerrel->relids);
6003 break;
6005 default:
6006 /* Should not happen, we have just checked this above */
6007 elog(ERROR, "unsupported join type %d", jointype);
6011 * For an inner join, all restrictions can be treated alike. Treating the
6012 * pushed down conditions as join conditions allows a top level full outer
6013 * join to be deparsed without requiring subqueries.
6015 if (jointype == JOIN_INNER)
6017 Assert(!fpinfo->joinclauses);
6018 fpinfo->joinclauses = fpinfo->remote_conds;
6019 fpinfo->remote_conds = NIL;
6021 else if (jointype == JOIN_LEFT || jointype == JOIN_RIGHT || jointype == JOIN_FULL)
6024 * Conditions, generated from semi-joins, should be evaluated before
6025 * LEFT/RIGHT/FULL join.
6027 if (!bms_is_empty(fpinfo_o->hidden_subquery_rels))
6029 fpinfo->make_outerrel_subquery = true;
6030 fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, outerrel->relids);
6033 if (!bms_is_empty(fpinfo_i->hidden_subquery_rels))
6035 fpinfo->make_innerrel_subquery = true;
6036 fpinfo->lower_subquery_rels = bms_add_members(fpinfo->lower_subquery_rels, innerrel->relids);
6040 /* Mark that this join can be pushed down safely */
6041 fpinfo->pushdown_safe = true;
6043 /* Get user mapping */
6044 if (fpinfo->use_remote_estimate)
6046 if (fpinfo_o->use_remote_estimate)
6047 fpinfo->user = fpinfo_o->user;
6048 else
6049 fpinfo->user = fpinfo_i->user;
6051 else
6052 fpinfo->user = NULL;
6055 * Set # of retrieved rows and cached relation costs to some negative
6056 * value, so that we can detect when they are set to some sensible values,
6057 * during one (usually the first) of the calls to estimate_path_cost_size.
6059 fpinfo->retrieved_rows = -1;
6060 fpinfo->rel_startup_cost = -1;
6061 fpinfo->rel_total_cost = -1;
6064 * Set the string describing this join relation to be used in EXPLAIN
6065 * output of corresponding ForeignScan. Note that the decoration we add
6066 * to the base relation names mustn't include any digits, or it'll confuse
6067 * postgresExplainForeignScan.
6069 fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)",
6070 fpinfo_o->relation_name,
6071 get_jointype_name(fpinfo->jointype),
6072 fpinfo_i->relation_name);
6075 * Set the relation index. This is defined as the position of this
6076 * joinrel in the join_rel_list list plus the length of the rtable list.
6077 * Note that since this joinrel is at the end of the join_rel_list list
6078 * when we are called, we can get the position by list_length.
6080 Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */
6081 fpinfo->relation_index =
6082 list_length(root->parse->rtable) + list_length(root->join_rel_list);
6084 return true;
6087 static void
6088 add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
6089 Path *epq_path, List *restrictlist)
6091 List *useful_pathkeys_list = NIL; /* List of all pathkeys */
6092 ListCell *lc;
6094 useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel);
6097 * Before creating sorted paths, arrange for the passed-in EPQ path, if
6098 * any, to return columns needed by the parent ForeignScan node so that
6099 * they will propagate up through Sort nodes injected below, if necessary.
6101 if (epq_path != NULL && useful_pathkeys_list != NIL)
6103 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
6104 PathTarget *target = copy_pathtarget(epq_path->pathtarget);
6106 /* Include columns required for evaluating PHVs in the tlist. */
6107 add_new_columns_to_pathtarget(target,
6108 pull_var_clause((Node *) target->exprs,
6109 PVC_RECURSE_PLACEHOLDERS));
6111 /* Include columns required for evaluating the local conditions. */
6112 foreach(lc, fpinfo->local_conds)
6114 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
6116 add_new_columns_to_pathtarget(target,
6117 pull_var_clause((Node *) rinfo->clause,
6118 PVC_RECURSE_PLACEHOLDERS));
6122 * If we have added any new columns, adjust the tlist of the EPQ path.
6124 * Note: the plan created using this path will only be used to execute
6125 * EPQ checks, where accuracy of the plan cost and width estimates
6126 * would not be important, so we do not do set_pathtarget_cost_width()
6127 * for the new pathtarget here. See also postgresGetForeignPlan().
6129 if (list_length(target->exprs) > list_length(epq_path->pathtarget->exprs))
6131 /* The EPQ path is a join path, so it is projection-capable. */
6132 Assert(is_projection_capable_path(epq_path));
6135 * Use create_projection_path() here, so as to avoid modifying it
6136 * in place.
6138 epq_path = (Path *) create_projection_path(root,
6139 rel,
6140 epq_path,
6141 target);
6145 /* Create one path for each set of pathkeys we found above. */
6146 foreach(lc, useful_pathkeys_list)
6148 double rows;
6149 int width;
6150 Cost startup_cost;
6151 Cost total_cost;
6152 List *useful_pathkeys = lfirst(lc);
6153 Path *sorted_epq_path;
6155 estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL,
6156 &rows, &width, &startup_cost, &total_cost);
6159 * The EPQ path must be at least as well sorted as the path itself, in
6160 * case it gets used as input to a mergejoin.
6162 sorted_epq_path = epq_path;
6163 if (sorted_epq_path != NULL &&
6164 !pathkeys_contained_in(useful_pathkeys,
6165 sorted_epq_path->pathkeys))
6166 sorted_epq_path = (Path *)
6167 create_sort_path(root,
6168 rel,
6169 sorted_epq_path,
6170 useful_pathkeys,
6171 -1.0);
6173 if (IS_SIMPLE_REL(rel))
6174 add_path(rel, (Path *)
6175 create_foreignscan_path(root, rel,
6176 NULL,
6177 rows,
6178 startup_cost,
6179 total_cost,
6180 useful_pathkeys,
6181 rel->lateral_relids,
6182 sorted_epq_path,
6183 NIL, /* no fdw_restrictinfo
6184 * list */
6185 NIL));
6186 else
6187 add_path(rel, (Path *)
6188 create_foreign_join_path(root, rel,
6189 NULL,
6190 rows,
6191 startup_cost,
6192 total_cost,
6193 useful_pathkeys,
6194 rel->lateral_relids,
6195 sorted_epq_path,
6196 restrictlist,
6197 NIL));
6202 * Parse options from foreign server and apply them to fpinfo.
6204 * New options might also require tweaking merge_fdw_options().
6206 static void
6207 apply_server_options(PgFdwRelationInfo *fpinfo)
6209 ListCell *lc;
6211 foreach(lc, fpinfo->server->options)
6213 DefElem *def = (DefElem *) lfirst(lc);
6215 if (strcmp(def->defname, "use_remote_estimate") == 0)
6216 fpinfo->use_remote_estimate = defGetBoolean(def);
6217 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
6218 (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0,
6219 NULL);
6220 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
6221 (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0,
6222 NULL);
6223 else if (strcmp(def->defname, "extensions") == 0)
6224 fpinfo->shippable_extensions =
6225 ExtractExtensionList(defGetString(def), false);
6226 else if (strcmp(def->defname, "fetch_size") == 0)
6227 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
6228 else if (strcmp(def->defname, "async_capable") == 0)
6229 fpinfo->async_capable = defGetBoolean(def);
6234 * Parse options from foreign table and apply them to fpinfo.
6236 * New options might also require tweaking merge_fdw_options().
6238 static void
6239 apply_table_options(PgFdwRelationInfo *fpinfo)
6241 ListCell *lc;
6243 foreach(lc, fpinfo->table->options)
6245 DefElem *def = (DefElem *) lfirst(lc);
6247 if (strcmp(def->defname, "use_remote_estimate") == 0)
6248 fpinfo->use_remote_estimate = defGetBoolean(def);
6249 else if (strcmp(def->defname, "fetch_size") == 0)
6250 (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
6251 else if (strcmp(def->defname, "async_capable") == 0)
6252 fpinfo->async_capable = defGetBoolean(def);
6257 * Merge FDW options from input relations into a new set of options for a join
6258 * or an upper rel.
6260 * For a join relation, FDW-specific information about the inner and outer
6261 * relations is provided using fpinfo_i and fpinfo_o. For an upper relation,
6262 * fpinfo_o provides the information for the input relation; fpinfo_i is
6263 * expected to NULL.
6265 static void
6266 merge_fdw_options(PgFdwRelationInfo *fpinfo,
6267 const PgFdwRelationInfo *fpinfo_o,
6268 const PgFdwRelationInfo *fpinfo_i)
6270 /* We must always have fpinfo_o. */
6271 Assert(fpinfo_o);
6273 /* fpinfo_i may be NULL, but if present the servers must both match. */
6274 Assert(!fpinfo_i ||
6275 fpinfo_i->server->serverid == fpinfo_o->server->serverid);
6278 * Copy the server specific FDW options. (For a join, both relations come
6279 * from the same server, so the server options should have the same value
6280 * for both relations.)
6282 fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
6283 fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
6284 fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
6285 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
6286 fpinfo->fetch_size = fpinfo_o->fetch_size;
6287 fpinfo->async_capable = fpinfo_o->async_capable;
6289 /* Merge the table level options from either side of the join. */
6290 if (fpinfo_i)
6293 * We'll prefer to use remote estimates for this join if any table
6294 * from either side of the join is using remote estimates. This is
6295 * most likely going to be preferred since they're already willing to
6296 * pay the price of a round trip to get the remote EXPLAIN. In any
6297 * case it's not entirely clear how we might otherwise handle this
6298 * best.
6300 fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
6301 fpinfo_i->use_remote_estimate;
6304 * Set fetch size to maximum of the joining sides, since we are
6305 * expecting the rows returned by the join to be proportional to the
6306 * relation sizes.
6308 fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
6311 * We'll prefer to consider this join async-capable if any table from
6312 * either side of the join is considered async-capable. This would be
6313 * reasonable because in that case the foreign server would have its
6314 * own resources to scan that table asynchronously, and the join could
6315 * also be computed asynchronously using the resources.
6317 fpinfo->async_capable = fpinfo_o->async_capable ||
6318 fpinfo_i->async_capable;
6323 * postgresGetForeignJoinPaths
6324 * Add possible ForeignPath to joinrel, if join is safe to push down.
6326 static void
6327 postgresGetForeignJoinPaths(PlannerInfo *root,
6328 RelOptInfo *joinrel,
6329 RelOptInfo *outerrel,
6330 RelOptInfo *innerrel,
6331 JoinType jointype,
6332 JoinPathExtraData *extra)
6334 PgFdwRelationInfo *fpinfo;
6335 ForeignPath *joinpath;
6336 double rows;
6337 int width;
6338 Cost startup_cost;
6339 Cost total_cost;
6340 Path *epq_path; /* Path to create plan to be executed when
6341 * EvalPlanQual gets triggered. */
6344 * Skip if this join combination has been considered already.
6346 if (joinrel->fdw_private)
6347 return;
6350 * This code does not work for joins with lateral references, since those
6351 * must have parameterized paths, which we don't generate yet.
6353 if (!bms_is_empty(joinrel->lateral_relids))
6354 return;
6357 * Create unfinished PgFdwRelationInfo entry which is used to indicate
6358 * that the join relation is already considered, so that we won't waste
6359 * time in judging safety of join pushdown and adding the same paths again
6360 * if found safe. Once we know that this join can be pushed down, we fill
6361 * the entry.
6363 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
6364 fpinfo->pushdown_safe = false;
6365 joinrel->fdw_private = fpinfo;
6366 /* attrs_used is only for base relations. */
6367 fpinfo->attrs_used = NULL;
6370 * If there is a possibility that EvalPlanQual will be executed, we need
6371 * to be able to reconstruct the row using scans of the base relations.
6372 * GetExistingLocalJoinPath will find a suitable path for this purpose in
6373 * the path list of the joinrel, if one exists. We must be careful to
6374 * call it before adding any ForeignPath, since the ForeignPath might
6375 * dominate the only suitable local path available. We also do it before
6376 * calling foreign_join_ok(), since that function updates fpinfo and marks
6377 * it as pushable if the join is found to be pushable.
6379 if (root->parse->commandType == CMD_DELETE ||
6380 root->parse->commandType == CMD_UPDATE ||
6381 root->rowMarks)
6383 epq_path = GetExistingLocalJoinPath(joinrel);
6384 if (!epq_path)
6386 elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
6387 return;
6390 else
6391 epq_path = NULL;
6393 if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
6395 /* Free path required for EPQ if we copied one; we don't need it now */
6396 if (epq_path)
6397 pfree(epq_path);
6398 return;
6402 * Compute the selectivity and cost of the local_conds, so we don't have
6403 * to do it over again for each path. The best we can do for these
6404 * conditions is to estimate selectivity on the basis of local statistics.
6405 * The local conditions are applied after the join has been computed on
6406 * the remote side like quals in WHERE clause, so pass jointype as
6407 * JOIN_INNER.
6409 fpinfo->local_conds_sel = clauselist_selectivity(root,
6410 fpinfo->local_conds,
6412 JOIN_INNER,
6413 NULL);
6414 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6417 * If we are going to estimate costs locally, estimate the join clause
6418 * selectivity here while we have special join info.
6420 if (!fpinfo->use_remote_estimate)
6421 fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
6422 0, fpinfo->jointype,
6423 extra->sjinfo);
6425 /* Estimate costs for bare join relation */
6426 estimate_path_cost_size(root, joinrel, NIL, NIL, NULL,
6427 &rows, &width, &startup_cost, &total_cost);
6428 /* Now update this information in the joinrel */
6429 joinrel->rows = rows;
6430 joinrel->reltarget->width = width;
6431 fpinfo->rows = rows;
6432 fpinfo->width = width;
6433 fpinfo->startup_cost = startup_cost;
6434 fpinfo->total_cost = total_cost;
6437 * Create a new join path and add it to the joinrel which represents a
6438 * join between foreign tables.
6440 joinpath = create_foreign_join_path(root,
6441 joinrel,
6442 NULL, /* default pathtarget */
6443 rows,
6444 startup_cost,
6445 total_cost,
6446 NIL, /* no pathkeys */
6447 joinrel->lateral_relids,
6448 epq_path,
6449 extra->restrictlist,
6450 NIL); /* no fdw_private */
6452 /* Add generated path into joinrel by add_path(). */
6453 add_path(joinrel, (Path *) joinpath);
6455 /* Consider pathkeys for the join relation */
6456 add_paths_with_pathkeys_for_rel(root, joinrel, epq_path,
6457 extra->restrictlist);
6459 /* XXX Consider parameterized paths for the join relation */
6463 * Assess whether the aggregation, grouping and having operations can be pushed
6464 * down to the foreign server. As a side effect, save information we obtain in
6465 * this function to PgFdwRelationInfo of the input relation.
6467 static bool
6468 foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
6469 Node *havingQual)
6471 Query *query = root->parse;
6472 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
6473 PathTarget *grouping_target = grouped_rel->reltarget;
6474 PgFdwRelationInfo *ofpinfo;
6475 ListCell *lc;
6476 int i;
6477 List *tlist = NIL;
6479 /* We currently don't support pushing Grouping Sets. */
6480 if (query->groupingSets)
6481 return false;
6483 /* Get the fpinfo of the underlying scan relation. */
6484 ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
6487 * If underlying scan relation has any local conditions, those conditions
6488 * are required to be applied before performing aggregation. Hence the
6489 * aggregate cannot be pushed down.
6491 if (ofpinfo->local_conds)
6492 return false;
6495 * Examine grouping expressions, as well as other expressions we'd need to
6496 * compute, and check whether they are safe to push down to the foreign
6497 * server. All GROUP BY expressions will be part of the grouping target
6498 * and thus there is no need to search for them separately. Add grouping
6499 * expressions into target list which will be passed to foreign server.
6501 * A tricky fine point is that we must not put any expression into the
6502 * target list that is just a foreign param (that is, something that
6503 * deparse.c would conclude has to be sent to the foreign server). If we
6504 * do, the expression will also appear in the fdw_exprs list of the plan
6505 * node, and setrefs.c will get confused and decide that the fdw_exprs
6506 * entry is actually a reference to the fdw_scan_tlist entry, resulting in
6507 * a broken plan. Somewhat oddly, it's OK if the expression contains such
6508 * a node, as long as it's not at top level; then no match is possible.
6510 i = 0;
6511 foreach(lc, grouping_target->exprs)
6513 Expr *expr = (Expr *) lfirst(lc);
6514 Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
6515 ListCell *l;
6518 * Check whether this expression is part of GROUP BY clause. Note we
6519 * check the whole GROUP BY clause not just processed_groupClause,
6520 * because we will ship all of it, cf. appendGroupByClause.
6522 if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
6524 TargetEntry *tle;
6527 * If any GROUP BY expression is not shippable, then we cannot
6528 * push down aggregation to the foreign server.
6530 if (!is_foreign_expr(root, grouped_rel, expr))
6531 return false;
6534 * If it would be a foreign param, we can't put it into the tlist,
6535 * so we have to fail.
6537 if (is_foreign_param(root, grouped_rel, expr))
6538 return false;
6541 * Pushable, so add to tlist. We need to create a TLE for this
6542 * expression and apply the sortgroupref to it. We cannot use
6543 * add_to_flat_tlist() here because that avoids making duplicate
6544 * entries in the tlist. If there are duplicate entries with
6545 * distinct sortgrouprefs, we have to duplicate that situation in
6546 * the output tlist.
6548 tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false);
6549 tle->ressortgroupref = sgref;
6550 tlist = lappend(tlist, tle);
6552 else
6555 * Non-grouping expression we need to compute. Can we ship it
6556 * as-is to the foreign server?
6558 if (is_foreign_expr(root, grouped_rel, expr) &&
6559 !is_foreign_param(root, grouped_rel, expr))
6561 /* Yes, so add to tlist as-is; OK to suppress duplicates */
6562 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6564 else
6566 /* Not pushable as a whole; extract its Vars and aggregates */
6567 List *aggvars;
6569 aggvars = pull_var_clause((Node *) expr,
6570 PVC_INCLUDE_AGGREGATES);
6573 * If any aggregate expression is not shippable, then we
6574 * cannot push down aggregation to the foreign server. (We
6575 * don't have to check is_foreign_param, since that certainly
6576 * won't return true for any such expression.)
6578 if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
6579 return false;
6582 * Add aggregates, if any, into the targetlist. Plain Vars
6583 * outside an aggregate can be ignored, because they should be
6584 * either same as some GROUP BY column or part of some GROUP
6585 * BY expression. In either case, they are already part of
6586 * the targetlist and thus no need to add them again. In fact
6587 * including plain Vars in the tlist when they do not match a
6588 * GROUP BY column would cause the foreign server to complain
6589 * that the shipped query is invalid.
6591 foreach(l, aggvars)
6593 Expr *aggref = (Expr *) lfirst(l);
6595 if (IsA(aggref, Aggref))
6596 tlist = add_to_flat_tlist(tlist, list_make1(aggref));
6601 i++;
6605 * Classify the pushable and non-pushable HAVING clauses and save them in
6606 * remote_conds and local_conds of the grouped rel's fpinfo.
6608 if (havingQual)
6610 foreach(lc, (List *) havingQual)
6612 Expr *expr = (Expr *) lfirst(lc);
6613 RestrictInfo *rinfo;
6616 * Currently, the core code doesn't wrap havingQuals in
6617 * RestrictInfos, so we must make our own.
6619 Assert(!IsA(expr, RestrictInfo));
6620 rinfo = make_restrictinfo(root,
6621 expr,
6622 true,
6623 false,
6624 false,
6625 false,
6626 root->qual_security_level,
6627 grouped_rel->relids,
6628 NULL,
6629 NULL);
6630 if (is_foreign_expr(root, grouped_rel, expr))
6631 fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
6632 else
6633 fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
6638 * If there are any local conditions, pull Vars and aggregates from it and
6639 * check whether they are safe to pushdown or not.
6641 if (fpinfo->local_conds)
6643 List *aggvars = NIL;
6645 foreach(lc, fpinfo->local_conds)
6647 RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
6649 aggvars = list_concat(aggvars,
6650 pull_var_clause((Node *) rinfo->clause,
6651 PVC_INCLUDE_AGGREGATES));
6654 foreach(lc, aggvars)
6656 Expr *expr = (Expr *) lfirst(lc);
6659 * If aggregates within local conditions are not safe to push
6660 * down, then we cannot push down the query. Vars are already
6661 * part of GROUP BY clause which are checked above, so no need to
6662 * access them again here. Again, we need not check
6663 * is_foreign_param for a foreign aggregate.
6665 if (IsA(expr, Aggref))
6667 if (!is_foreign_expr(root, grouped_rel, expr))
6668 return false;
6670 tlist = add_to_flat_tlist(tlist, list_make1(expr));
6675 /* Store generated targetlist */
6676 fpinfo->grouped_tlist = tlist;
6678 /* Safe to pushdown */
6679 fpinfo->pushdown_safe = true;
6682 * Set # of retrieved rows and cached relation costs to some negative
6683 * value, so that we can detect when they are set to some sensible values,
6684 * during one (usually the first) of the calls to estimate_path_cost_size.
6686 fpinfo->retrieved_rows = -1;
6687 fpinfo->rel_startup_cost = -1;
6688 fpinfo->rel_total_cost = -1;
6691 * Set the string describing this grouped relation to be used in EXPLAIN
6692 * output of corresponding ForeignScan. Note that the decoration we add
6693 * to the base relation name mustn't include any digits, or it'll confuse
6694 * postgresExplainForeignScan.
6696 fpinfo->relation_name = psprintf("Aggregate on (%s)",
6697 ofpinfo->relation_name);
6699 return true;
6703 * postgresGetForeignUpperPaths
6704 * Add paths for post-join operations like aggregation, grouping etc. if
6705 * corresponding operations are safe to push down.
6707 static void
6708 postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
6709 RelOptInfo *input_rel, RelOptInfo *output_rel,
6710 void *extra)
6712 PgFdwRelationInfo *fpinfo;
6715 * If input rel is not safe to pushdown, then simply return as we cannot
6716 * perform any post-join operations on the foreign server.
6718 if (!input_rel->fdw_private ||
6719 !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
6720 return;
6722 /* Ignore stages we don't support; and skip any duplicate calls. */
6723 if ((stage != UPPERREL_GROUP_AGG &&
6724 stage != UPPERREL_ORDERED &&
6725 stage != UPPERREL_FINAL) ||
6726 output_rel->fdw_private)
6727 return;
6729 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
6730 fpinfo->pushdown_safe = false;
6731 fpinfo->stage = stage;
6732 output_rel->fdw_private = fpinfo;
6734 switch (stage)
6736 case UPPERREL_GROUP_AGG:
6737 add_foreign_grouping_paths(root, input_rel, output_rel,
6738 (GroupPathExtraData *) extra);
6739 break;
6740 case UPPERREL_ORDERED:
6741 add_foreign_ordered_paths(root, input_rel, output_rel);
6742 break;
6743 case UPPERREL_FINAL:
6744 add_foreign_final_paths(root, input_rel, output_rel,
6745 (FinalPathExtraData *) extra);
6746 break;
6747 default:
6748 elog(ERROR, "unexpected upper relation: %d", (int) stage);
6749 break;
6754 * add_foreign_grouping_paths
6755 * Add foreign path for grouping and/or aggregation.
6757 * Given input_rel represents the underlying scan. The paths are added to the
6758 * given grouped_rel.
6760 static void
6761 add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
6762 RelOptInfo *grouped_rel,
6763 GroupPathExtraData *extra)
6765 Query *parse = root->parse;
6766 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6767 PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
6768 ForeignPath *grouppath;
6769 double rows;
6770 int width;
6771 Cost startup_cost;
6772 Cost total_cost;
6774 /* Nothing to be done, if there is no grouping or aggregation required. */
6775 if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
6776 !root->hasHavingQual)
6777 return;
6779 Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
6780 extra->patype == PARTITIONWISE_AGGREGATE_FULL);
6782 /* save the input_rel as outerrel in fpinfo */
6783 fpinfo->outerrel = input_rel;
6786 * Copy foreign table, foreign server, user mapping, FDW options etc.
6787 * details from the input relation's fpinfo.
6789 fpinfo->table = ifpinfo->table;
6790 fpinfo->server = ifpinfo->server;
6791 fpinfo->user = ifpinfo->user;
6792 merge_fdw_options(fpinfo, ifpinfo, NULL);
6795 * Assess if it is safe to push down aggregation and grouping.
6797 * Use HAVING qual from extra. In case of child partition, it will have
6798 * translated Vars.
6800 if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
6801 return;
6804 * Compute the selectivity and cost of the local_conds, so we don't have
6805 * to do it over again for each path. (Currently we create just a single
6806 * path here, but in future it would be possible that we build more paths
6807 * such as pre-sorted paths as in postgresGetForeignPaths and
6808 * postgresGetForeignJoinPaths.) The best we can do for these conditions
6809 * is to estimate selectivity on the basis of local statistics.
6811 fpinfo->local_conds_sel = clauselist_selectivity(root,
6812 fpinfo->local_conds,
6814 JOIN_INNER,
6815 NULL);
6817 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
6819 /* Estimate the cost of push down */
6820 estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL,
6821 &rows, &width, &startup_cost, &total_cost);
6823 /* Now update this information in the fpinfo */
6824 fpinfo->rows = rows;
6825 fpinfo->width = width;
6826 fpinfo->startup_cost = startup_cost;
6827 fpinfo->total_cost = total_cost;
6829 /* Create and add foreign path to the grouping relation. */
6830 grouppath = create_foreign_upper_path(root,
6831 grouped_rel,
6832 grouped_rel->reltarget,
6833 rows,
6834 startup_cost,
6835 total_cost,
6836 NIL, /* no pathkeys */
6837 NULL,
6838 NIL, /* no fdw_restrictinfo list */
6839 NIL); /* no fdw_private */
6841 /* Add generated path into grouped_rel by add_path(). */
6842 add_path(grouped_rel, (Path *) grouppath);
6846 * add_foreign_ordered_paths
6847 * Add foreign paths for performing the final sort remotely.
6849 * Given input_rel contains the source-data Paths. The paths are added to the
6850 * given ordered_rel.
6852 static void
6853 add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel,
6854 RelOptInfo *ordered_rel)
6856 Query *parse = root->parse;
6857 PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
6858 PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private;
6859 PgFdwPathExtraData *fpextra;
6860 double rows;
6861 int width;
6862 Cost startup_cost;
6863 Cost total_cost;
6864 List *fdw_private;
6865 ForeignPath *ordered_path;
6866 ListCell *lc;
6868 /* Shouldn't get here unless the query has ORDER BY */
6869 Assert(parse->sortClause);
6871 /* We don't support cases where there are any SRFs in the targetlist */
6872 if (parse->hasTargetSRFs)
6873 return;
6875 /* Save the input_rel as outerrel in fpinfo */
6876 fpinfo->outerrel = input_rel;
6879 * Copy foreign table, foreign server, user mapping, FDW options etc.
6880 * details from the input relation's fpinfo.
6882 fpinfo->table = ifpinfo->table;
6883 fpinfo->server = ifpinfo->server;
6884 fpinfo->user = ifpinfo->user;
6885 merge_fdw_options(fpinfo, ifpinfo, NULL);
6888 * If the input_rel is a base or join relation, we would already have
6889 * considered pushing down the final sort to the remote server when
6890 * creating pre-sorted foreign paths for that relation, because the
6891 * query_pathkeys is set to the root->sort_pathkeys in that case (see
6892 * standard_qp_callback()).
6894 if (input_rel->reloptkind == RELOPT_BASEREL ||
6895 input_rel->reloptkind == RELOPT_JOINREL)
6897 Assert(root->query_pathkeys == root->sort_pathkeys);
6899 /* Safe to push down if the query_pathkeys is safe to push down */
6900 fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe;
6902 return;
6905 /* The input_rel should be a grouping relation */
6906 Assert(input_rel->reloptkind == RELOPT_UPPER_REL &&
6907 ifpinfo->stage == UPPERREL_GROUP_AGG);
6910 * We try to create a path below by extending a simple foreign path for
6911 * the underlying grouping relation to perform the final sort remotely,
6912 * which is stored into the fdw_private list of the resulting path.
6915 /* Assess if it is safe to push down the final sort */
6916 foreach(lc, root->sort_pathkeys)
6918 PathKey *pathkey = (PathKey *) lfirst(lc);
6919 EquivalenceClass *pathkey_ec = pathkey->pk_eclass;
6922 * is_foreign_expr would detect volatile expressions as well, but
6923 * checking ec_has_volatile here saves some cycles.
6925 if (pathkey_ec->ec_has_volatile)
6926 return;
6929 * Can't push down the sort if pathkey's opfamily is not shippable.
6931 if (!is_shippable(pathkey->pk_opfamily, OperatorFamilyRelationId,
6932 fpinfo))
6933 return;
6936 * The EC must contain a shippable EM that is computed in input_rel's
6937 * reltarget, else we can't push down the sort.
6939 if (find_em_for_rel_target(root,
6940 pathkey_ec,
6941 input_rel) == NULL)
6942 return;
6945 /* Safe to push down */
6946 fpinfo->pushdown_safe = true;
6948 /* Construct PgFdwPathExtraData */
6949 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
6950 fpextra->target = root->upper_targets[UPPERREL_ORDERED];
6951 fpextra->has_final_sort = true;
6953 /* Estimate the costs of performing the final sort remotely */
6954 estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra,
6955 &rows, &width, &startup_cost, &total_cost);
6958 * Build the fdw_private list that will be used by postgresGetForeignPlan.
6959 * Items in the list must match order in enum FdwPathPrivateIndex.
6961 fdw_private = list_make2(makeBoolean(true), makeBoolean(false));
6963 /* Create foreign ordering path */
6964 ordered_path = create_foreign_upper_path(root,
6965 input_rel,
6966 root->upper_targets[UPPERREL_ORDERED],
6967 rows,
6968 startup_cost,
6969 total_cost,
6970 root->sort_pathkeys,
6971 NULL, /* no extra plan */
6972 NIL, /* no fdw_restrictinfo
6973 * list */
6974 fdw_private);
6976 /* and add it to the ordered_rel */
6977 add_path(ordered_rel, (Path *) ordered_path);
6981 * add_foreign_final_paths
6982 * Add foreign paths for performing the final processing remotely.
6984 * Given input_rel contains the source-data Paths. The paths are added to the
6985 * given final_rel.
6987 static void
6988 add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
6989 RelOptInfo *final_rel,
6990 FinalPathExtraData *extra)
6992 Query *parse = root->parse;
6993 PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
6994 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private;
6995 bool has_final_sort = false;
6996 List *pathkeys = NIL;
6997 PgFdwPathExtraData *fpextra;
6998 bool save_use_remote_estimate = false;
6999 double rows;
7000 int width;
7001 Cost startup_cost;
7002 Cost total_cost;
7003 List *fdw_private;
7004 ForeignPath *final_path;
7007 * Currently, we only support this for SELECT commands
7009 if (parse->commandType != CMD_SELECT)
7010 return;
7013 * No work if there is no FOR UPDATE/SHARE clause and if there is no need
7014 * to add a LIMIT node
7016 if (!parse->rowMarks && !extra->limit_needed)
7017 return;
7019 /* We don't support cases where there are any SRFs in the targetlist */
7020 if (parse->hasTargetSRFs)
7021 return;
7023 /* Save the input_rel as outerrel in fpinfo */
7024 fpinfo->outerrel = input_rel;
7027 * Copy foreign table, foreign server, user mapping, FDW options etc.
7028 * details from the input relation's fpinfo.
7030 fpinfo->table = ifpinfo->table;
7031 fpinfo->server = ifpinfo->server;
7032 fpinfo->user = ifpinfo->user;
7033 merge_fdw_options(fpinfo, ifpinfo, NULL);
7036 * If there is no need to add a LIMIT node, there might be a ForeignPath
7037 * in the input_rel's pathlist that implements all behavior of the query.
7038 * Note: we would already have accounted for the query's FOR UPDATE/SHARE
7039 * (if any) before we get here.
7041 if (!extra->limit_needed)
7043 ListCell *lc;
7045 Assert(parse->rowMarks);
7048 * Grouping and aggregation are not supported with FOR UPDATE/SHARE,
7049 * so the input_rel should be a base, join, or ordered relation; and
7050 * if it's an ordered relation, its input relation should be a base or
7051 * join relation.
7053 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
7054 input_rel->reloptkind == RELOPT_JOINREL ||
7055 (input_rel->reloptkind == RELOPT_UPPER_REL &&
7056 ifpinfo->stage == UPPERREL_ORDERED &&
7057 (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL ||
7058 ifpinfo->outerrel->reloptkind == RELOPT_JOINREL)));
7060 foreach(lc, input_rel->pathlist)
7062 Path *path = (Path *) lfirst(lc);
7065 * apply_scanjoin_target_to_paths() uses create_projection_path()
7066 * to adjust each of its input paths if needed, whereas
7067 * create_ordered_paths() uses apply_projection_to_path() to do
7068 * that. So the former might have put a ProjectionPath on top of
7069 * the ForeignPath; look through ProjectionPath and see if the
7070 * path underneath it is ForeignPath.
7072 if (IsA(path, ForeignPath) ||
7073 (IsA(path, ProjectionPath) &&
7074 IsA(((ProjectionPath *) path)->subpath, ForeignPath)))
7077 * Create foreign final path; this gets rid of a
7078 * no-longer-needed outer plan (if any), which makes the
7079 * EXPLAIN output look cleaner
7081 final_path = create_foreign_upper_path(root,
7082 path->parent,
7083 path->pathtarget,
7084 path->rows,
7085 path->startup_cost,
7086 path->total_cost,
7087 path->pathkeys,
7088 NULL, /* no extra plan */
7089 NIL, /* no fdw_restrictinfo
7090 * list */
7091 NIL); /* no fdw_private */
7093 /* and add it to the final_rel */
7094 add_path(final_rel, (Path *) final_path);
7096 /* Safe to push down */
7097 fpinfo->pushdown_safe = true;
7099 return;
7104 * If we get here it means no ForeignPaths; since we would already
7105 * have considered pushing down all operations for the query to the
7106 * remote server, give up on it.
7108 return;
7111 Assert(extra->limit_needed);
7114 * If the input_rel is an ordered relation, replace the input_rel with its
7115 * input relation
7117 if (input_rel->reloptkind == RELOPT_UPPER_REL &&
7118 ifpinfo->stage == UPPERREL_ORDERED)
7120 input_rel = ifpinfo->outerrel;
7121 ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private;
7122 has_final_sort = true;
7123 pathkeys = root->sort_pathkeys;
7126 /* The input_rel should be a base, join, or grouping relation */
7127 Assert(input_rel->reloptkind == RELOPT_BASEREL ||
7128 input_rel->reloptkind == RELOPT_JOINREL ||
7129 (input_rel->reloptkind == RELOPT_UPPER_REL &&
7130 ifpinfo->stage == UPPERREL_GROUP_AGG));
7133 * We try to create a path below by extending a simple foreign path for
7134 * the underlying base, join, or grouping relation to perform the final
7135 * sort (if has_final_sort) and the LIMIT restriction remotely, which is
7136 * stored into the fdw_private list of the resulting path. (We
7137 * re-estimate the costs of sorting the underlying relation, if
7138 * has_final_sort.)
7142 * Assess if it is safe to push down the LIMIT and OFFSET to the remote
7143 * server
7147 * If the underlying relation has any local conditions, the LIMIT/OFFSET
7148 * cannot be pushed down.
7150 if (ifpinfo->local_conds)
7151 return;
7154 * If the query has FETCH FIRST .. WITH TIES, 1) it must have ORDER BY as
7155 * well, which is used to determine which additional rows tie for the last
7156 * place in the result set, and 2) ORDER BY must already have been
7157 * determined to be safe to push down before we get here. So in that case
7158 * the FETCH clause is safe to push down with ORDER BY if the remote
7159 * server is v13 or later, but if not, the remote query will fail entirely
7160 * for lack of support for it. Since we do not currently have a way to do
7161 * a remote-version check (without accessing the remote server), disable
7162 * pushing the FETCH clause for now.
7164 if (parse->limitOption == LIMIT_OPTION_WITH_TIES)
7165 return;
7168 * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are
7169 * not safe to remote.
7171 if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) ||
7172 !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount))
7173 return;
7175 /* Safe to push down */
7176 fpinfo->pushdown_safe = true;
7178 /* Construct PgFdwPathExtraData */
7179 fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData));
7180 fpextra->target = root->upper_targets[UPPERREL_FINAL];
7181 fpextra->has_final_sort = has_final_sort;
7182 fpextra->has_limit = extra->limit_needed;
7183 fpextra->limit_tuples = extra->limit_tuples;
7184 fpextra->count_est = extra->count_est;
7185 fpextra->offset_est = extra->offset_est;
7188 * Estimate the costs of performing the final sort and the LIMIT
7189 * restriction remotely. If has_final_sort is false, we wouldn't need to
7190 * execute EXPLAIN anymore if use_remote_estimate, since the costs can be
7191 * roughly estimated using the costs we already have for the underlying
7192 * relation, in the same way as when use_remote_estimate is false. Since
7193 * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to
7194 * false in that case.
7196 if (!fpextra->has_final_sort)
7198 save_use_remote_estimate = ifpinfo->use_remote_estimate;
7199 ifpinfo->use_remote_estimate = false;
7201 estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra,
7202 &rows, &width, &startup_cost, &total_cost);
7203 if (!fpextra->has_final_sort)
7204 ifpinfo->use_remote_estimate = save_use_remote_estimate;
7207 * Build the fdw_private list that will be used by postgresGetForeignPlan.
7208 * Items in the list must match order in enum FdwPathPrivateIndex.
7210 fdw_private = list_make2(makeBoolean(has_final_sort),
7211 makeBoolean(extra->limit_needed));
7214 * Create foreign final path; this gets rid of a no-longer-needed outer
7215 * plan (if any), which makes the EXPLAIN output look cleaner
7217 final_path = create_foreign_upper_path(root,
7218 input_rel,
7219 root->upper_targets[UPPERREL_FINAL],
7220 rows,
7221 startup_cost,
7222 total_cost,
7223 pathkeys,
7224 NULL, /* no extra plan */
7225 NIL, /* no fdw_restrictinfo list */
7226 fdw_private);
7228 /* and add it to the final_rel */
7229 add_path(final_rel, (Path *) final_path);
7233 * postgresIsForeignPathAsyncCapable
7234 * Check whether a given ForeignPath node is async-capable.
7236 static bool
7237 postgresIsForeignPathAsyncCapable(ForeignPath *path)
7239 RelOptInfo *rel = ((Path *) path)->parent;
7240 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
7242 return fpinfo->async_capable;
7246 * postgresForeignAsyncRequest
7247 * Asynchronously request next tuple from a foreign PostgreSQL table.
7249 static void
7250 postgresForeignAsyncRequest(AsyncRequest *areq)
7252 produce_tuple_asynchronously(areq, true);
7256 * postgresForeignAsyncConfigureWait
7257 * Configure a file descriptor event for which we wish to wait.
7259 static void
7260 postgresForeignAsyncConfigureWait(AsyncRequest *areq)
7262 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7263 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7264 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7265 AppendState *requestor = (AppendState *) areq->requestor;
7266 WaitEventSet *set = requestor->as_eventset;
7268 /* This should not be called unless callback_pending */
7269 Assert(areq->callback_pending);
7272 * If process_pending_request() has been invoked on the given request
7273 * before we get here, we might have some tuples already; in which case
7274 * complete the request
7276 if (fsstate->next_tuple < fsstate->num_tuples)
7278 complete_pending_request(areq);
7279 if (areq->request_complete)
7280 return;
7281 Assert(areq->callback_pending);
7284 /* We must have run out of tuples */
7285 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7287 /* The core code would have registered postmaster death event */
7288 Assert(GetNumRegisteredWaitEvents(set) >= 1);
7290 /* Begin an asynchronous data fetch if not already done */
7291 if (!pendingAreq)
7292 fetch_more_data_begin(areq);
7293 else if (pendingAreq->requestor != areq->requestor)
7296 * This is the case when the in-process request was made by another
7297 * Append. Note that it might be useless to process the request made
7298 * by that Append, because the query might not need tuples from that
7299 * Append anymore; so we avoid processing it to begin a fetch for the
7300 * given request if possible. If there are any child subplans of the
7301 * same parent that are ready for new requests, skip the given
7302 * request. Likewise, if there are any configured events other than
7303 * the postmaster death event, skip it. Otherwise, process the
7304 * in-process request, then begin a fetch to configure the event
7305 * below, because we might otherwise end up with no configured events
7306 * other than the postmaster death event.
7308 if (!bms_is_empty(requestor->as_needrequest))
7309 return;
7310 if (GetNumRegisteredWaitEvents(set) > 1)
7311 return;
7312 process_pending_request(pendingAreq);
7313 fetch_more_data_begin(areq);
7315 else if (pendingAreq->requestee != areq->requestee)
7318 * This is the case when the in-process request was made by the same
7319 * parent but for a different child. Since we configure only the
7320 * event for the request made for that child, skip the given request.
7322 return;
7324 else
7325 Assert(pendingAreq == areq);
7327 AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
7328 NULL, areq);
7332 * postgresForeignAsyncNotify
7333 * Fetch some more tuples from a file descriptor that becomes ready,
7334 * requesting next tuple.
7336 static void
7337 postgresForeignAsyncNotify(AsyncRequest *areq)
7339 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7340 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7342 /* The core code would have initialized the callback_pending flag */
7343 Assert(!areq->callback_pending);
7346 * If process_pending_request() has been invoked on the given request
7347 * before we get here, we might have some tuples already; in which case
7348 * produce the next tuple
7350 if (fsstate->next_tuple < fsstate->num_tuples)
7352 produce_tuple_asynchronously(areq, true);
7353 return;
7356 /* We must have run out of tuples */
7357 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7359 /* The request should be currently in-process */
7360 Assert(fsstate->conn_state->pendingAreq == areq);
7362 /* On error, report the original query, not the FETCH. */
7363 if (!PQconsumeInput(fsstate->conn))
7364 pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
7366 fetch_more_data(node);
7368 produce_tuple_asynchronously(areq, true);
7372 * Asynchronously produce next tuple from a foreign PostgreSQL table.
7374 static void
7375 produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
7377 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7378 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7379 AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7380 TupleTableSlot *result;
7382 /* This should not be called if the request is currently in-process */
7383 Assert(areq != pendingAreq);
7385 /* Fetch some more tuples, if we've run out */
7386 if (fsstate->next_tuple >= fsstate->num_tuples)
7388 /* No point in another fetch if we already detected EOF, though */
7389 if (!fsstate->eof_reached)
7391 /* Mark the request as pending for a callback */
7392 ExecAsyncRequestPending(areq);
7393 /* Begin another fetch if requested and if no pending request */
7394 if (fetch && !pendingAreq)
7395 fetch_more_data_begin(areq);
7397 else
7399 /* There's nothing more to do; just return a NULL pointer */
7400 result = NULL;
7401 /* Mark the request as complete */
7402 ExecAsyncRequestDone(areq, result);
7404 return;
7407 /* Get a tuple from the ForeignScan node */
7408 result = areq->requestee->ExecProcNodeReal(areq->requestee);
7409 if (!TupIsNull(result))
7411 /* Mark the request as complete */
7412 ExecAsyncRequestDone(areq, result);
7413 return;
7416 /* We must have run out of tuples */
7417 Assert(fsstate->next_tuple >= fsstate->num_tuples);
7419 /* Fetch some more tuples, if we've not detected EOF yet */
7420 if (!fsstate->eof_reached)
7422 /* Mark the request as pending for a callback */
7423 ExecAsyncRequestPending(areq);
7424 /* Begin another fetch if requested and if no pending request */
7425 if (fetch && !pendingAreq)
7426 fetch_more_data_begin(areq);
7428 else
7430 /* There's nothing more to do; just return a NULL pointer */
7431 result = NULL;
7432 /* Mark the request as complete */
7433 ExecAsyncRequestDone(areq, result);
7438 * Begin an asynchronous data fetch.
7440 * Note: this function assumes there is no currently-in-progress asynchronous
7441 * data fetch.
7443 * Note: fetch_more_data must be called to fetch the result.
7445 static void
7446 fetch_more_data_begin(AsyncRequest *areq)
7448 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7449 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7450 char sql[64];
7452 Assert(!fsstate->conn_state->pendingAreq);
7454 /* Create the cursor synchronously. */
7455 if (!fsstate->cursor_exists)
7456 create_cursor(node);
7458 /* We will send this query, but not wait for the response. */
7459 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
7460 fsstate->fetch_size, fsstate->cursor_number);
7462 if (!PQsendQuery(fsstate->conn, sql))
7463 pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
7465 /* Remember that the request is in process */
7466 fsstate->conn_state->pendingAreq = areq;
7470 * Process a pending asynchronous request.
7472 void
7473 process_pending_request(AsyncRequest *areq)
7475 ForeignScanState *node = (ForeignScanState *) areq->requestee;
7476 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
7478 /* The request would have been pending for a callback */
7479 Assert(areq->callback_pending);
7481 /* The request should be currently in-process */
7482 Assert(fsstate->conn_state->pendingAreq == areq);
7484 fetch_more_data(node);
7487 * If we didn't get any tuples, must be end of data; complete the request
7488 * now. Otherwise, we postpone completing the request until we are called
7489 * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify().
7491 if (fsstate->next_tuple >= fsstate->num_tuples)
7493 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7494 areq->callback_pending = false;
7495 /* Mark the request as complete */
7496 ExecAsyncRequestDone(areq, NULL);
7497 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7498 ExecAsyncResponse(areq);
7503 * Complete a pending asynchronous request.
7505 static void
7506 complete_pending_request(AsyncRequest *areq)
7508 /* The request would have been pending for a callback */
7509 Assert(areq->callback_pending);
7511 /* Unlike AsyncNotify, we unset callback_pending ourselves */
7512 areq->callback_pending = false;
7514 /* We begin a fetch afterwards if necessary; don't fetch */
7515 produce_tuple_asynchronously(areq, false);
7517 /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
7518 ExecAsyncResponse(areq);
7520 /* Also, we do instrumentation ourselves, if required */
7521 if (areq->requestee->instrument)
7522 InstrUpdateTupleCount(areq->requestee->instrument,
7523 TupIsNull(areq->result) ? 0.0 : 1.0);
7527 * Create a tuple from the specified row of the PGresult.
7529 * rel is the local representation of the foreign table, attinmeta is
7530 * conversion data for the rel's tupdesc, and retrieved_attrs is an
7531 * integer list of the table column numbers present in the PGresult.
7532 * fsstate is the ForeignScan plan node's execution state.
7533 * temp_context is a working context that can be reset after each tuple.
7535 * Note: either rel or fsstate, but not both, can be NULL. rel is NULL
7536 * if we're processing a remote join, while fsstate is NULL in a non-query
7537 * context such as ANALYZE, or if we're processing a non-scan query node.
7539 static HeapTuple
7540 make_tuple_from_result_row(PGresult *res,
7541 int row,
7542 Relation rel,
7543 AttInMetadata *attinmeta,
7544 List *retrieved_attrs,
7545 ForeignScanState *fsstate,
7546 MemoryContext temp_context)
7548 HeapTuple tuple;
7549 TupleDesc tupdesc;
7550 Datum *values;
7551 bool *nulls;
7552 ItemPointer ctid = NULL;
7553 ConversionLocation errpos;
7554 ErrorContextCallback errcallback;
7555 MemoryContext oldcontext;
7556 ListCell *lc;
7557 int j;
7559 Assert(row < PQntuples(res));
7562 * Do the following work in a temp context that we reset after each tuple.
7563 * This cleans up not only the data we have direct access to, but any
7564 * cruft the I/O functions might leak.
7566 oldcontext = MemoryContextSwitchTo(temp_context);
7569 * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
7570 * provided, otherwise look to the scan node's ScanTupleSlot.
7572 if (rel)
7573 tupdesc = RelationGetDescr(rel);
7574 else
7576 Assert(fsstate);
7577 tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
7580 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
7581 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
7582 /* Initialize to nulls for any columns not present in result */
7583 memset(nulls, true, tupdesc->natts * sizeof(bool));
7586 * Set up and install callback to report where conversion error occurs.
7588 errpos.cur_attno = 0;
7589 errpos.rel = rel;
7590 errpos.fsstate = fsstate;
7591 errcallback.callback = conversion_error_callback;
7592 errcallback.arg = (void *) &errpos;
7593 errcallback.previous = error_context_stack;
7594 error_context_stack = &errcallback;
7597 * i indexes columns in the relation, j indexes columns in the PGresult.
7599 j = 0;
7600 foreach(lc, retrieved_attrs)
7602 int i = lfirst_int(lc);
7603 char *valstr;
7605 /* fetch next column's textual value */
7606 if (PQgetisnull(res, row, j))
7607 valstr = NULL;
7608 else
7609 valstr = PQgetvalue(res, row, j);
7612 * convert value to internal representation
7614 * Note: we ignore system columns other than ctid and oid in result
7616 errpos.cur_attno = i;
7617 if (i > 0)
7619 /* ordinary column */
7620 Assert(i <= tupdesc->natts);
7621 nulls[i - 1] = (valstr == NULL);
7622 /* Apply the input function even to nulls, to support domains */
7623 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
7624 valstr,
7625 attinmeta->attioparams[i - 1],
7626 attinmeta->atttypmods[i - 1]);
7628 else if (i == SelfItemPointerAttributeNumber)
7630 /* ctid */
7631 if (valstr != NULL)
7633 Datum datum;
7635 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
7636 ctid = (ItemPointer) DatumGetPointer(datum);
7639 errpos.cur_attno = 0;
7641 j++;
7644 /* Uninstall error context callback. */
7645 error_context_stack = errcallback.previous;
7648 * Check we got the expected number of columns. Note: j == 0 and
7649 * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
7651 if (j > 0 && j != PQnfields(res))
7652 elog(ERROR, "remote query result does not match the foreign table");
7655 * Build the result tuple in caller's memory context.
7657 MemoryContextSwitchTo(oldcontext);
7659 tuple = heap_form_tuple(tupdesc, values, nulls);
7662 * If we have a CTID to return, install it in both t_self and t_ctid.
7663 * t_self is the normal place, but if the tuple is converted to a
7664 * composite Datum, t_self will be lost; setting t_ctid allows CTID to be
7665 * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code).
7667 if (ctid)
7668 tuple->t_self = tuple->t_data->t_ctid = *ctid;
7671 * Stomp on the xmin, xmax, and cmin fields from the tuple created by
7672 * heap_form_tuple. heap_form_tuple actually creates the tuple with
7673 * DatumTupleFields, not HeapTupleFields, but the executor expects
7674 * HeapTupleFields and will happily extract system columns on that
7675 * assumption. If we don't do this then, for example, the tuple length
7676 * ends up in the xmin field, which isn't what we want.
7678 HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId);
7679 HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId);
7680 HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId);
7682 /* Clean up */
7683 MemoryContextReset(temp_context);
7685 return tuple;
7689 * Callback function which is called when error occurs during column value
7690 * conversion. Print names of column and relation.
7692 * Note that this function mustn't do any catalog lookups, since we are in
7693 * an already-failed transaction. Fortunately, we can get the needed info
7694 * from the relation or the query's rangetable instead.
7696 static void
7697 conversion_error_callback(void *arg)
7699 ConversionLocation *errpos = (ConversionLocation *) arg;
7700 Relation rel = errpos->rel;
7701 ForeignScanState *fsstate = errpos->fsstate;
7702 const char *attname = NULL;
7703 const char *relname = NULL;
7704 bool is_wholerow = false;
7707 * If we're in a scan node, always use aliases from the rangetable, for
7708 * consistency between the simple-relation and remote-join cases. Look at
7709 * the relation's tupdesc only if we're not in a scan node.
7711 if (fsstate)
7713 /* ForeignScan case */
7714 ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan);
7715 int varno = 0;
7716 AttrNumber colno = 0;
7718 if (fsplan->scan.scanrelid > 0)
7720 /* error occurred in a scan against a foreign table */
7721 varno = fsplan->scan.scanrelid;
7722 colno = errpos->cur_attno;
7724 else
7726 /* error occurred in a scan against a foreign join */
7727 TargetEntry *tle;
7729 tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist,
7730 errpos->cur_attno - 1);
7733 * Target list can have Vars and expressions. For Vars, we can
7734 * get some information, however for expressions we can't. Thus
7735 * for expressions, just show generic context message.
7737 if (IsA(tle->expr, Var))
7739 Var *var = (Var *) tle->expr;
7741 varno = var->varno;
7742 colno = var->varattno;
7746 if (varno > 0)
7748 EState *estate = fsstate->ss.ps.state;
7749 RangeTblEntry *rte = exec_rt_fetch(varno, estate);
7751 relname = rte->eref->aliasname;
7753 if (colno == 0)
7754 is_wholerow = true;
7755 else if (colno > 0 && colno <= list_length(rte->eref->colnames))
7756 attname = strVal(list_nth(rte->eref->colnames, colno - 1));
7757 else if (colno == SelfItemPointerAttributeNumber)
7758 attname = "ctid";
7761 else if (rel)
7763 /* Non-ForeignScan case (we should always have a rel here) */
7764 TupleDesc tupdesc = RelationGetDescr(rel);
7766 relname = RelationGetRelationName(rel);
7767 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
7769 Form_pg_attribute attr = TupleDescAttr(tupdesc,
7770 errpos->cur_attno - 1);
7772 attname = NameStr(attr->attname);
7774 else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
7775 attname = "ctid";
7778 if (relname && is_wholerow)
7779 errcontext("whole-row reference to foreign table \"%s\"", relname);
7780 else if (relname && attname)
7781 errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
7782 else
7783 errcontext("processing expression at position %d in select list",
7784 errpos->cur_attno);
7788 * Given an EquivalenceClass and a foreign relation, find an EC member
7789 * that can be used to sort the relation remotely according to a pathkey
7790 * using this EC.
7792 * If there is more than one suitable candidate, return an arbitrary
7793 * one of them. If there is none, return NULL.
7795 * This checks that the EC member expression uses only Vars from the given
7796 * rel and is shippable. Caller must separately verify that the pathkey's
7797 * ordering operator is shippable.
7799 EquivalenceMember *
7800 find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel)
7802 ListCell *lc;
7804 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
7806 foreach(lc, ec->ec_members)
7808 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc);
7811 * Note we require !bms_is_empty, else we'd accept constant
7812 * expressions which are not suitable for the purpose.
7814 if (bms_is_subset(em->em_relids, rel->relids) &&
7815 !bms_is_empty(em->em_relids) &&
7816 bms_is_empty(bms_intersect(em->em_relids, fpinfo->hidden_subquery_rels)) &&
7817 is_foreign_expr(root, rel, em->em_expr))
7818 return em;
7821 return NULL;
7825 * Find an EquivalenceClass member that is to be computed as a sort column
7826 * in the given rel's reltarget, and is shippable.
7828 * If there is more than one suitable candidate, return an arbitrary
7829 * one of them. If there is none, return NULL.
7831 * This checks that the EC member expression uses only Vars from the given
7832 * rel and is shippable. Caller must separately verify that the pathkey's
7833 * ordering operator is shippable.
7835 EquivalenceMember *
7836 find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec,
7837 RelOptInfo *rel)
7839 PathTarget *target = rel->reltarget;
7840 ListCell *lc1;
7841 int i;
7843 i = 0;
7844 foreach(lc1, target->exprs)
7846 Expr *expr = (Expr *) lfirst(lc1);
7847 Index sgref = get_pathtarget_sortgroupref(target, i);
7848 ListCell *lc2;
7850 /* Ignore non-sort expressions */
7851 if (sgref == 0 ||
7852 get_sortgroupref_clause_noerr(sgref,
7853 root->parse->sortClause) == NULL)
7855 i++;
7856 continue;
7859 /* We ignore binary-compatible relabeling on both ends */
7860 while (expr && IsA(expr, RelabelType))
7861 expr = ((RelabelType *) expr)->arg;
7863 /* Locate an EquivalenceClass member matching this expr, if any */
7864 foreach(lc2, ec->ec_members)
7866 EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2);
7867 Expr *em_expr;
7869 /* Don't match constants */
7870 if (em->em_is_const)
7871 continue;
7873 /* Ignore child members */
7874 if (em->em_is_child)
7875 continue;
7877 /* Match if same expression (after stripping relabel) */
7878 em_expr = em->em_expr;
7879 while (em_expr && IsA(em_expr, RelabelType))
7880 em_expr = ((RelabelType *) em_expr)->arg;
7882 if (!equal(em_expr, expr))
7883 continue;
7885 /* Check that expression (including relabels!) is shippable */
7886 if (is_foreign_expr(root, rel, em->em_expr))
7887 return em;
7890 i++;
7893 return NULL;
7897 * Determine batch size for a given foreign table. The option specified for
7898 * a table has precedence.
7900 static int
7901 get_batch_size_option(Relation rel)
7903 Oid foreigntableid = RelationGetRelid(rel);
7904 ForeignTable *table;
7905 ForeignServer *server;
7906 List *options;
7907 ListCell *lc;
7909 /* we use 1 by default, which means "no batching" */
7910 int batch_size = 1;
7913 * Load options for table and server. We append server options after table
7914 * options, because table options take precedence.
7916 table = GetForeignTable(foreigntableid);
7917 server = GetForeignServer(table->serverid);
7919 options = NIL;
7920 options = list_concat(options, table->options);
7921 options = list_concat(options, server->options);
7923 /* See if either table or server specifies batch_size. */
7924 foreach(lc, options)
7926 DefElem *def = (DefElem *) lfirst(lc);
7928 if (strcmp(def->defname, "batch_size") == 0)
7930 (void) parse_int(defGetString(def), &batch_size, 0, NULL);
7931 break;
7935 return batch_size;