3 * framework for parallelizing pg_upgrade's once-in-each-database tasks
5 * This framework provides an efficient way of running the various
6 * once-in-each-database tasks required by pg_upgrade. Specifically, it
7 * parallelizes these tasks by managing a set of slots that follow a simple
8 * state machine and by using libpq's asynchronous APIs to establish the
9 * connections and run the queries. Callers simply need to create a callback
10 * function and build/execute an UpgradeTask. A simple example follows:
13 * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
15 * for (int i = 0; i < PQntuples(res); i++)
17 * ... process results ...
22 * my_task(ClusterInfo *cluster)
24 * UpgradeTask *task = upgrade_task_create();
26 * upgrade_task_add_step(task,
27 * "... query text ...",
29 * true, // let the task free the PGresult
30 * NULL); // "arg" pointer for callback
31 * upgrade_task_run(task, cluster);
32 * upgrade_task_free(task);
35 * Note that multiple steps can be added to a given task. When there are
36 * multiple steps, the task will run all of the steps consecutively in the same
37 * database connection before freeing the connection and moving on. In other
38 * words, it only ever initiates one connection to each database in the
39 * cluster for a given run.
41 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
42 * src/bin/pg_upgrade/task.c
45 #include "postgres_fe.h"
47 #include "common/connect.h"
48 #include "fe_utils/string_utils.h"
49 #include "pg_upgrade.h"
52 * dbs_complete stores the number of databases that we have completed
53 * processing. When this value equals the number of databases in the cluster,
54 * the task is finished.
56 static int dbs_complete
;
59 * dbs_processing stores the index of the next database in the cluster's array
60 * of databases that will be picked up for processing. It will always be
61 * greater than or equal to dbs_complete.
63 static int dbs_processing
;
66 * This struct stores the information for a single step of a task. Note that
67 * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
68 * All steps in a task are run in a single connection before moving on to the
69 * next database (which requires a new connection).
71 typedef struct UpgradeTaskStep
73 UpgradeTaskProcessCB process_cb
; /* processes the results of the query */
74 bool free_result
; /* should we free the result? */
75 void *arg
; /* pointer passed to process_cb */
79 * This struct is a thin wrapper around an array of steps, i.e.,
80 * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
84 UpgradeTaskStep
*steps
;
90 * The different states for a parallel slot.
92 typedef enum UpgradeTaskSlotState
94 FREE
, /* slot available for use in a new database */
95 CONNECTING
, /* waiting for connection to be established */
96 RUNNING_QUERIES
, /* running/processing queries in the task */
97 } UpgradeTaskSlotState
;
100 * We maintain an array of user_opts.jobs slots to execute the task.
102 typedef struct UpgradeTaskSlot
104 UpgradeTaskSlotState state
; /* state of the slot */
105 int db_idx
; /* index of the database assigned to slot */
106 int step_idx
; /* index of the current step of task */
107 PGconn
*conn
; /* current connection managed by slot */
108 bool ready
; /* slot is ready for processing */
109 bool select_mode
; /* select() mode: true->read, false->write */
110 int sock
; /* file descriptor for connection's socket */
114 * Initializes an UpgradeTask.
117 upgrade_task_create(void)
119 UpgradeTask
*task
= pg_malloc0(sizeof(UpgradeTask
));
121 task
->queries
= createPQExpBuffer();
123 /* All tasks must first set a secure search_path. */
124 upgrade_task_add_step(task
, ALWAYS_SECURE_SEARCH_PATH_SQL
, NULL
, true, NULL
);
130 * Frees all storage associated with an UpgradeTask.
133 upgrade_task_free(UpgradeTask
*task
)
135 destroyPQExpBuffer(task
->queries
);
136 pg_free(task
->steps
);
141 * Adds a step to an UpgradeTask. The steps will be executed in each database
142 * in the order in which they are added.
144 * task: task object that must have been initialized via upgrade_task_create()
145 * query: the query text
146 * process_cb: function that processes the results of the query
147 * free_result: should we free the PGresult, or leave it to the caller?
148 * arg: pointer to task-specific data that is passed to each callback
151 upgrade_task_add_step(UpgradeTask
*task
, const char *query
,
152 UpgradeTaskProcessCB process_cb
, bool free_result
,
155 UpgradeTaskStep
*new_step
;
157 task
->steps
= pg_realloc(task
->steps
,
158 ++task
->num_steps
* sizeof(UpgradeTaskStep
));
160 new_step
= &task
->steps
[task
->num_steps
- 1];
161 new_step
->process_cb
= process_cb
;
162 new_step
->free_result
= free_result
;
165 appendPQExpBuffer(task
->queries
, "%s;", query
);
169 * Build a connection string for the slot's current database and asynchronously
170 * start a new connection, but do not wait for the connection to be
174 start_conn(const ClusterInfo
*cluster
, UpgradeTaskSlot
*slot
)
176 PQExpBufferData conn_opts
;
177 DbInfo
*dbinfo
= &cluster
->dbarr
.dbs
[slot
->db_idx
];
179 /* Build connection string with proper quoting */
180 initPQExpBuffer(&conn_opts
);
181 appendPQExpBufferStr(&conn_opts
, "dbname=");
182 appendConnStrVal(&conn_opts
, dbinfo
->db_name
);
183 appendPQExpBufferStr(&conn_opts
, " user=");
184 appendConnStrVal(&conn_opts
, os_info
.user
);
185 appendPQExpBuffer(&conn_opts
, " port=%d", cluster
->port
);
186 if (cluster
->sockdir
)
188 appendPQExpBufferStr(&conn_opts
, " host=");
189 appendConnStrVal(&conn_opts
, cluster
->sockdir
);
192 slot
->conn
= PQconnectStart(conn_opts
.data
);
195 pg_fatal("failed to create connection with connection string: \"%s\"",
198 termPQExpBuffer(&conn_opts
);
202 * Run the process_cb callback function to process the result of a query, and
203 * free the result if the caller indicated we should do so.
206 process_query_result(const ClusterInfo
*cluster
, UpgradeTaskSlot
*slot
,
207 const UpgradeTask
*task
)
209 UpgradeTaskStep
*steps
= &task
->steps
[slot
->step_idx
];
210 UpgradeTaskProcessCB process_cb
= steps
->process_cb
;
211 DbInfo
*dbinfo
= &cluster
->dbarr
.dbs
[slot
->db_idx
];
212 PGresult
*res
= PQgetResult(slot
->conn
);
214 if (PQstatus(slot
->conn
) == CONNECTION_BAD
||
215 (PQresultStatus(res
) != PGRES_TUPLES_OK
&&
216 PQresultStatus(res
) != PGRES_COMMAND_OK
))
217 pg_fatal("connection failure: %s", PQerrorMessage(slot
->conn
));
220 * We assume that a NULL process_cb callback function means there's
221 * nothing to process. This is primarily intended for the initial step in
222 * every task that sets a safe search_path.
225 (*process_cb
) (dbinfo
, res
, steps
->arg
);
227 if (steps
->free_result
)
232 * Advances the state machine for a given slot as necessary.
235 process_slot(const ClusterInfo
*cluster
, UpgradeTaskSlot
*slot
, const UpgradeTask
*task
)
237 PostgresPollingStatusType status
;
247 * If all of the databases in the cluster have been processed or
248 * are currently being processed by other slots, we are done.
250 if (dbs_processing
>= cluster
->dbarr
.ndbs
)
254 * Claim the next database in the cluster's array and initiate a
257 slot
->db_idx
= dbs_processing
++;
258 slot
->state
= CONNECTING
;
259 start_conn(cluster
, slot
);
265 /* Check for connection failure. */
266 status
= PQconnectPoll(slot
->conn
);
267 if (status
== PGRES_POLLING_FAILED
)
268 pg_fatal("connection failure: %s", PQerrorMessage(slot
->conn
));
270 /* Check whether the connection is still establishing. */
271 if (status
!= PGRES_POLLING_OK
)
273 slot
->select_mode
= (status
== PGRES_POLLING_READING
);
278 * Move on to running/processing the queries in the task.
280 slot
->state
= RUNNING_QUERIES
;
281 slot
->select_mode
= true; /* wait until ready for reading */
282 if (!PQsendQuery(slot
->conn
, task
->queries
->data
))
283 pg_fatal("connection failure: %s", PQerrorMessage(slot
->conn
));
287 case RUNNING_QUERIES
:
290 * Consume any available data and clear the read-ready indicator
291 * for the connection.
293 if (!PQconsumeInput(slot
->conn
))
294 pg_fatal("connection failure: %s", PQerrorMessage(slot
->conn
));
297 * Process any results that are ready so that we can free up this
298 * slot for another database as soon as possible.
300 for (; slot
->step_idx
< task
->num_steps
; slot
->step_idx
++)
302 /* If no more results are available yet, move on. */
303 if (PQisBusy(slot
->conn
))
306 process_query_result(cluster
, slot
, task
);
310 * If we just finished processing the result of the last step in
311 * the task, free the slot. We recursively call this function on
312 * the newly-freed slot so that we can start initiating the next
313 * connection immediately instead of waiting for the next loop
317 PQfinish(slot
->conn
);
318 memset(slot
, 0, sizeof(UpgradeTaskSlot
));
321 process_slot(cluster
, slot
, task
);
328 * Returns -1 on error, else the number of ready descriptors.
331 select_loop(int maxFd
, fd_set
*input
, fd_set
*output
)
333 fd_set save_input
= *input
;
334 fd_set save_output
= *output
;
344 *output
= save_output
;
346 i
= select(maxFd
+ 1, input
, output
, NULL
, NULL
);
349 if (i
< 0 && errno
== EINTR
)
352 if (i
== SOCKET_ERROR
&& WSAGetLastError() == WSAEINTR
)
360 * Wait on the slots to either finish connecting or to receive query results if
361 * possible. This avoids a tight loop in upgrade_task_run().
364 wait_on_slots(UpgradeTaskSlot
*slots
, int numslots
)
373 for (int i
= 0; i
< numslots
; i
++)
376 * We assume the previous call to process_slot() handled everything
377 * that was marked ready in the previous call to wait_on_slots(), if
380 slots
[i
].ready
= false;
383 * This function should only ever see free slots as we are finishing
384 * processing the last few databases, at which point we don't have any
385 * databases left for them to process. We'll never use these slots
386 * again, so we can safely ignore them.
388 if (slots
[i
].state
== FREE
)
392 * Add the socket to the set.
394 slots
[i
].sock
= PQsocket(slots
[i
].conn
);
395 if (slots
[i
].sock
< 0)
396 pg_fatal("invalid socket");
397 FD_SET(slots
[i
].sock
, slots
[i
].select_mode
? &input
: &output
);
398 maxFd
= Max(maxFd
, slots
[i
].sock
);
402 * If we found socket(s) to wait on, wait.
404 if (select_loop(maxFd
, &input
, &output
) == -1)
405 pg_fatal("select() failed: %m");
408 * Mark which sockets appear to be ready.
410 for (int i
= 0; i
< numslots
; i
++)
411 slots
[i
].ready
|= (FD_ISSET(slots
[i
].sock
, &input
) ||
412 FD_ISSET(slots
[i
].sock
, &output
));
416 * Runs all the steps of the task in every database in the cluster using
417 * user_opts.jobs parallel slots.
420 upgrade_task_run(const UpgradeTask
*task
, const ClusterInfo
*cluster
)
422 int jobs
= Max(1, user_opts
.jobs
);
423 UpgradeTaskSlot
*slots
= pg_malloc0(sizeof(UpgradeTaskSlot
) * jobs
);
429 * Process every slot the first time round.
431 for (int i
= 0; i
< jobs
; i
++)
432 slots
[i
].ready
= true;
434 while (dbs_complete
< cluster
->dbarr
.ndbs
)
436 for (int i
= 0; i
< jobs
; i
++)
437 process_slot(cluster
, &slots
[i
], task
);
439 wait_on_slots(slots
, jobs
);