1 /*-------------------------------------------------------------------------
4 * Parallel support for front-end parallel database connections
7 * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/fe_utils/parallel_slot.c
12 *-------------------------------------------------------------------------
15 #if defined(WIN32) && FD_SETSIZE < 1024
16 #error FD_SETSIZE needs to have been increased
19 #include "postgres_fe.h"
21 #include <sys/select.h>
23 #include "common/logging.h"
24 #include "fe_utils/cancel.h"
25 #include "fe_utils/parallel_slot.h"
26 #include "fe_utils/query_utils.h"
28 #define ERRCODE_UNDEFINED_TABLE "42P01"
30 static int select_loop(int maxFd
, fd_set
*workerset
);
31 static bool processQueryResult(ParallelSlot
*slot
, PGresult
*result
);
34 * Process (and delete) a query result. Returns true if there's no problem,
35 * false otherwise. It's up to the handler to decide what constitutes a
39 processQueryResult(ParallelSlot
*slot
, PGresult
*result
)
41 Assert(slot
->handler
!= NULL
);
43 /* On failure, the handler should return NULL after freeing the result */
44 if (!slot
->handler(result
, slot
->connection
, slot
->handler_context
))
47 /* Ok, we have to free it ourself */
53 * Consume all the results generated for the given connection until
54 * nothing remains. If at least one error is encountered, return false.
55 * Note that this will block if the connection is busy.
58 consumeQueryResult(ParallelSlot
*slot
)
63 SetCancelConn(slot
->connection
);
64 while ((result
= PQgetResult(slot
->connection
)) != NULL
)
66 if (!processQueryResult(slot
, result
))
74 * Wait until a file descriptor from the given set becomes readable.
76 * Returns the number of ready descriptors, or -1 on failure (including
77 * getting a cancel request).
80 select_loop(int maxFd
, fd_set
*workerset
)
83 fd_set saveSet
= *workerset
;
91 * On Windows, we need to check once in a while for cancel requests;
92 * on other platforms we rely on select() returning when interrupted.
96 struct timeval tv
= {0, 1000000};
103 *workerset
= saveSet
;
104 i
= select(maxFd
+ 1, workerset
, NULL
, NULL
, tvp
);
107 if (i
== SOCKET_ERROR
)
111 if (WSAGetLastError() == WSAEINTR
)
116 if (i
< 0 && errno
== EINTR
)
117 continue; /* ignore this */
118 if (i
< 0 || CancelRequested
)
119 return -1; /* but not this */
121 continue; /* timeout (Win32 only) */
129 * Return the offset of a suitable idle slot, or -1 if none are available. If
130 * the given dbname is not null, only idle slots connected to the given
131 * database are considered suitable, otherwise all idle connected slots are
132 * considered suitable.
135 find_matching_idle_slot(const ParallelSlotArray
*sa
, const char *dbname
)
139 for (i
= 0; i
< sa
->numslots
; i
++)
141 if (sa
->slots
[i
].inUse
)
144 if (sa
->slots
[i
].connection
== NULL
)
147 if (dbname
== NULL
||
148 strcmp(PQdb(sa
->slots
[i
].connection
), dbname
) == 0)
155 * Return the offset of the first slot without a database connection, or -1 if
156 * all slots are connected.
159 find_unconnected_slot(const ParallelSlotArray
*sa
)
163 for (i
= 0; i
< sa
->numslots
; i
++)
165 if (sa
->slots
[i
].inUse
)
168 if (sa
->slots
[i
].connection
== NULL
)
176 * Return the offset of the first idle slot, or -1 if all slots are busy.
179 find_any_idle_slot(const ParallelSlotArray
*sa
)
183 for (i
= 0; i
< sa
->numslots
; i
++)
184 if (!sa
->slots
[i
].inUse
)
191 * Wait for any slot's connection to have query results, consume the results,
192 * and update the slot's status as appropriate. Returns true on success,
193 * false on cancellation, on error, or if no slots are connected.
196 wait_on_slots(ParallelSlotArray
*sa
)
201 PGconn
*cancelconn
= NULL
;
203 /* We must reconstruct the fd_set for each call to select_loop */
206 for (i
= 0; i
< sa
->numslots
; i
++)
210 /* We shouldn't get here if we still have slots without connections */
211 Assert(sa
->slots
[i
].connection
!= NULL
);
213 sock
= PQsocket(sa
->slots
[i
].connection
);
216 * We don't really expect any connections to lose their sockets after
217 * startup, but just in case, cope by ignoring them.
222 /* Keep track of the first valid connection we see. */
223 if (cancelconn
== NULL
)
224 cancelconn
= sa
->slots
[i
].connection
;
226 FD_SET(sock
, &slotset
);
232 * If we get this far with no valid connections, processing cannot
235 if (cancelconn
== NULL
)
238 SetCancelConn(cancelconn
);
239 i
= select_loop(maxFd
, &slotset
);
246 for (i
= 0; i
< sa
->numslots
; i
++)
250 sock
= PQsocket(sa
->slots
[i
].connection
);
252 if (sock
>= 0 && FD_ISSET(sock
, &slotset
))
254 /* select() says input is available, so consume it */
255 PQconsumeInput(sa
->slots
[i
].connection
);
258 /* Collect result(s) as long as any are available */
259 while (!PQisBusy(sa
->slots
[i
].connection
))
261 PGresult
*result
= PQgetResult(sa
->slots
[i
].connection
);
265 /* Handle and discard the command result */
266 if (!processQueryResult(&sa
->slots
[i
], result
))
271 /* This connection has become idle */
272 sa
->slots
[i
].inUse
= false;
273 ParallelSlotClearHandler(&sa
->slots
[i
]);
282 * Open a new database connection using the stored connection parameters and
283 * optionally a given dbname if not null, execute the stored initial command if
284 * any, and associate the new connection with the given slot.
287 connect_slot(ParallelSlotArray
*sa
, int slotno
, const char *dbname
)
289 const char *old_override
;
290 ParallelSlot
*slot
= &sa
->slots
[slotno
];
292 old_override
= sa
->cparams
->override_dbname
;
294 sa
->cparams
->override_dbname
= dbname
;
295 slot
->connection
= connectDatabase(sa
->cparams
, sa
->progname
, sa
->echo
, false, true);
296 sa
->cparams
->override_dbname
= old_override
;
298 if (PQsocket(slot
->connection
) >= FD_SETSIZE
)
299 pg_fatal("too many jobs for this platform");
301 /* Setup the connection using the supplied command, if any. */
303 executeCommand(slot
->connection
, sa
->initcmd
, sa
->echo
);
307 * ParallelSlotsGetIdle
308 * Return a connection slot that is ready to execute a command.
310 * The slot returned is chosen as follows:
312 * If any idle slot already has an open connection, and if either dbname is
313 * null or the existing connection is to the given database, that slot will be
314 * returned allowing the connection to be reused.
316 * Otherwise, if any idle slot is not yet connected to any database, the slot
317 * will be returned with it's connection opened using the stored cparams and
318 * optionally the given dbname if not null.
320 * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
321 * after having it's connection disconnected and reconnected using the stored
322 * cparams and optionally the given dbname if not null.
324 * Otherwise, if any slots have connections that are busy, we loop on select()
325 * until one socket becomes available. When this happens, we read the whole
326 * set and mark as free all sockets that become available. We then select a
327 * slot using the same rules as above.
329 * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
331 * For any connection created, if the stored initcmd is not null, it will be
332 * executed as a command on the newly formed connection before the slot is
335 * If an error occurs, NULL is returned.
338 ParallelSlotsGetIdle(ParallelSlotArray
*sa
, const char *dbname
)
343 Assert(sa
->numslots
> 0);
347 /* First choice: a slot already connected to the desired database. */
348 offset
= find_matching_idle_slot(sa
, dbname
);
351 sa
->slots
[offset
].inUse
= true;
352 return &sa
->slots
[offset
];
355 /* Second choice: a slot not connected to any database. */
356 offset
= find_unconnected_slot(sa
);
359 connect_slot(sa
, offset
, dbname
);
360 sa
->slots
[offset
].inUse
= true;
361 return &sa
->slots
[offset
];
364 /* Third choice: a slot connected to the wrong database. */
365 offset
= find_any_idle_slot(sa
);
368 disconnectDatabase(sa
->slots
[offset
].connection
);
369 sa
->slots
[offset
].connection
= NULL
;
370 connect_slot(sa
, offset
, dbname
);
371 sa
->slots
[offset
].inUse
= true;
372 return &sa
->slots
[offset
];
376 * Fourth choice: block until one or more slots become available. If
377 * any slots hit a fatal error, we'll find out about that here and
380 if (!wait_on_slots(sa
))
387 * Prepare a set of parallel slots but do not connect to any database.
389 * This creates and initializes a set of slots, marking all parallel slots as
390 * free and ready to use. Establishing connections is delayed until requesting
391 * a free slot. The cparams, progname, echo, and initcmd are stored for later
392 * use and must remain valid for the lifetime of the returned array.
395 ParallelSlotsSetup(int numslots
, ConnParams
*cparams
, const char *progname
,
396 bool echo
, const char *initcmd
)
398 ParallelSlotArray
*sa
;
400 Assert(numslots
> 0);
401 Assert(cparams
!= NULL
);
402 Assert(progname
!= NULL
);
404 sa
= (ParallelSlotArray
*) palloc0(offsetof(ParallelSlotArray
, slots
) +
405 numslots
* sizeof(ParallelSlot
));
407 sa
->numslots
= numslots
;
408 sa
->cparams
= cparams
;
409 sa
->progname
= progname
;
411 sa
->initcmd
= initcmd
;
417 * ParallelSlotsAdoptConn
418 * Assign an open connection to the slots array for reuse.
420 * This turns over ownership of an open connection to a slots array. The
421 * caller should not further use or close the connection. All the connection's
422 * parameters (user, host, port, etc.) except possibly dbname should match
423 * those of the slots array's cparams, as given in ParallelSlotsSetup. If
424 * these parameters differ, subsequent behavior is undefined.
427 ParallelSlotsAdoptConn(ParallelSlotArray
*sa
, PGconn
*conn
)
431 offset
= find_unconnected_slot(sa
);
433 sa
->slots
[offset
].connection
= conn
;
435 disconnectDatabase(conn
);
439 * ParallelSlotsTerminate
440 * Clean up a set of parallel slots
442 * Iterate through all connections in a given set of ParallelSlots and
443 * terminate all connections.
446 ParallelSlotsTerminate(ParallelSlotArray
*sa
)
450 for (i
= 0; i
< sa
->numslots
; i
++)
452 PGconn
*conn
= sa
->slots
[i
].connection
;
457 disconnectDatabase(conn
);
462 * ParallelSlotsWaitCompletion
464 * Wait for all connections to finish, returning false if at least one
465 * error has been found on the way.
468 ParallelSlotsWaitCompletion(ParallelSlotArray
*sa
)
472 for (i
= 0; i
< sa
->numslots
; i
++)
474 if (sa
->slots
[i
].connection
== NULL
)
476 if (!consumeQueryResult(&sa
->slots
[i
]))
478 /* Mark connection as idle */
479 sa
->slots
[i
].inUse
= false;
480 ParallelSlotClearHandler(&sa
->slots
[i
]);
487 * TableCommandResultHandler
489 * ParallelSlotResultHandler for results of commands (not queries) against
492 * Requires that the result status is either PGRES_COMMAND_OK or an error about
493 * a missing table. This is useful for utilities that compile a list of tables
494 * to process and then run commands (vacuum, reindex, or whatever) against
495 * those tables, as there is a race condition between the time the list is
496 * compiled and the time the command attempts to open the table.
498 * For missing tables, logs an error but allows processing to continue.
500 * For all other errors, logs an error and terminates further processing.
502 * res: PGresult from the query executed on the slot's connection
503 * conn: connection belonging to the slot
507 TableCommandResultHandler(PGresult
*res
, PGconn
*conn
, void *context
)
510 Assert(conn
!= NULL
);
513 * If it's an error, report it. Errors about a missing table are harmless
514 * so we continue processing; but die for other errors.
516 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
518 char *sqlState
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
520 pg_log_error("processing of database \"%s\" failed: %s",
521 PQdb(conn
), PQerrorMessage(conn
));
523 if (sqlState
&& strcmp(sqlState
, ERRCODE_UNDEFINED_TABLE
) != 0)