1 /*-------------------------------------------------------------------------
4 * Parallel support for front-end parallel database connections
7 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/fe_utils/parallel_slot.c
12 *-------------------------------------------------------------------------
16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
19 #include "postgres_fe.h"
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
25 #include "common/logging.h"
26 #include "fe_utils/cancel.h"
27 #include "fe_utils/parallel_slot.h"
28 #include "fe_utils/query_utils.h"
30 #define ERRCODE_UNDEFINED_TABLE "42P01"
32 static int select_loop(int maxFd
, fd_set
*workerset
);
33 static bool processQueryResult(ParallelSlot
*slot
, PGresult
*result
);
36 * Process (and delete) a query result. Returns true if there's no problem,
37 * false otherwise. It's up to the handler to decide what constitutes a
41 processQueryResult(ParallelSlot
*slot
, PGresult
*result
)
43 Assert(slot
->handler
!= NULL
);
45 /* On failure, the handler should return NULL after freeing the result */
46 if (!slot
->handler(result
, slot
->connection
, slot
->handler_context
))
49 /* Ok, we have to free it ourself */
55 * Consume all the results generated for the given connection until
56 * nothing remains. If at least one error is encountered, return false.
57 * Note that this will block if the connection is busy.
60 consumeQueryResult(ParallelSlot
*slot
)
65 SetCancelConn(slot
->connection
);
66 while ((result
= PQgetResult(slot
->connection
)) != NULL
)
68 if (!processQueryResult(slot
, result
))
76 * Wait until a file descriptor from the given set becomes readable.
78 * Returns the number of ready descriptors, or -1 on failure (including
79 * getting a cancel request).
82 select_loop(int maxFd
, fd_set
*workerset
)
85 fd_set saveSet
= *workerset
;
93 * On Windows, we need to check once in a while for cancel requests;
94 * on other platforms we rely on select() returning when interrupted.
98 struct timeval tv
= {0, 1000000};
105 *workerset
= saveSet
;
106 i
= select(maxFd
+ 1, workerset
, NULL
, NULL
, tvp
);
109 if (i
== SOCKET_ERROR
)
113 if (WSAGetLastError() == WSAEINTR
)
118 if (i
< 0 && errno
== EINTR
)
119 continue; /* ignore this */
120 if (i
< 0 || CancelRequested
)
121 return -1; /* but not this */
123 continue; /* timeout (Win32 only) */
131 * Return the offset of a suitable idle slot, or -1 if none are available. If
132 * the given dbname is not null, only idle slots connected to the given
133 * database are considered suitable, otherwise all idle connected slots are
134 * considered suitable.
137 find_matching_idle_slot(const ParallelSlotArray
*sa
, const char *dbname
)
141 for (i
= 0; i
< sa
->numslots
; i
++)
143 if (sa
->slots
[i
].inUse
)
146 if (sa
->slots
[i
].connection
== NULL
)
149 if (dbname
== NULL
||
150 strcmp(PQdb(sa
->slots
[i
].connection
), dbname
) == 0)
157 * Return the offset of the first slot without a database connection, or -1 if
158 * all slots are connected.
161 find_unconnected_slot(const ParallelSlotArray
*sa
)
165 for (i
= 0; i
< sa
->numslots
; i
++)
167 if (sa
->slots
[i
].inUse
)
170 if (sa
->slots
[i
].connection
== NULL
)
178 * Return the offset of the first idle slot, or -1 if all slots are busy.
181 find_any_idle_slot(const ParallelSlotArray
*sa
)
185 for (i
= 0; i
< sa
->numslots
; i
++)
186 if (!sa
->slots
[i
].inUse
)
193 * Wait for any slot's connection to have query results, consume the results,
194 * and update the slot's status as appropriate. Returns true on success,
195 * false on cancellation, on error, or if no slots are connected.
198 wait_on_slots(ParallelSlotArray
*sa
)
203 PGconn
*cancelconn
= NULL
;
205 /* We must reconstruct the fd_set for each call to select_loop */
208 for (i
= 0; i
< sa
->numslots
; i
++)
212 /* We shouldn't get here if we still have slots without connections */
213 Assert(sa
->slots
[i
].connection
!= NULL
);
215 sock
= PQsocket(sa
->slots
[i
].connection
);
218 * We don't really expect any connections to lose their sockets after
219 * startup, but just in case, cope by ignoring them.
224 /* Keep track of the first valid connection we see. */
225 if (cancelconn
== NULL
)
226 cancelconn
= sa
->slots
[i
].connection
;
228 FD_SET(sock
, &slotset
);
234 * If we get this far with no valid connections, processing cannot
237 if (cancelconn
== NULL
)
240 SetCancelConn(sa
->slots
->connection
);
241 i
= select_loop(maxFd
, &slotset
);
248 for (i
= 0; i
< sa
->numslots
; i
++)
252 sock
= PQsocket(sa
->slots
[i
].connection
);
254 if (sock
>= 0 && FD_ISSET(sock
, &slotset
))
256 /* select() says input is available, so consume it */
257 PQconsumeInput(sa
->slots
[i
].connection
);
260 /* Collect result(s) as long as any are available */
261 while (!PQisBusy(sa
->slots
[i
].connection
))
263 PGresult
*result
= PQgetResult(sa
->slots
[i
].connection
);
267 /* Handle and discard the command result */
268 if (!processQueryResult(&sa
->slots
[i
], result
))
273 /* This connection has become idle */
274 sa
->slots
[i
].inUse
= false;
275 ParallelSlotClearHandler(&sa
->slots
[i
]);
284 * Open a new database connection using the stored connection parameters and
285 * optionally a given dbname if not null, execute the stored initial command if
286 * any, and associate the new connection with the given slot.
289 connect_slot(ParallelSlotArray
*sa
, int slotno
, const char *dbname
)
291 const char *old_override
;
292 ParallelSlot
*slot
= &sa
->slots
[slotno
];
294 old_override
= sa
->cparams
->override_dbname
;
296 sa
->cparams
->override_dbname
= dbname
;
297 slot
->connection
= connectDatabase(sa
->cparams
, sa
->progname
, sa
->echo
, false, true);
298 sa
->cparams
->override_dbname
= old_override
;
300 if (PQsocket(slot
->connection
) >= FD_SETSIZE
)
302 pg_log_fatal("too many jobs for this platform");
306 /* Setup the connection using the supplied command, if any. */
308 executeCommand(slot
->connection
, sa
->initcmd
, sa
->echo
);
312 * ParallelSlotsGetIdle
313 * Return a connection slot that is ready to execute a command.
315 * The slot returned is chosen as follows:
317 * If any idle slot already has an open connection, and if either dbname is
318 * null or the existing connection is to the given database, that slot will be
319 * returned allowing the connection to be reused.
321 * Otherwise, if any idle slot is not yet connected to any database, the slot
322 * will be returned with it's connection opened using the stored cparams and
323 * optionally the given dbname if not null.
325 * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
326 * after having it's connection disconnected and reconnected using the stored
327 * cparams and optionally the given dbname if not null.
329 * Otherwise, if any slots have connections that are busy, we loop on select()
330 * until one socket becomes available. When this happens, we read the whole
331 * set and mark as free all sockets that become available. We then select a
332 * slot using the same rules as above.
334 * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
336 * For any connection created, if the stored initcmd is not null, it will be
337 * executed as a command on the newly formed connection before the slot is
340 * If an error occurs, NULL is returned.
343 ParallelSlotsGetIdle(ParallelSlotArray
*sa
, const char *dbname
)
348 Assert(sa
->numslots
> 0);
352 /* First choice: a slot already connected to the desired database. */
353 offset
= find_matching_idle_slot(sa
, dbname
);
356 sa
->slots
[offset
].inUse
= true;
357 return &sa
->slots
[offset
];
360 /* Second choice: a slot not connected to any database. */
361 offset
= find_unconnected_slot(sa
);
364 connect_slot(sa
, offset
, dbname
);
365 sa
->slots
[offset
].inUse
= true;
366 return &sa
->slots
[offset
];
369 /* Third choice: a slot connected to the wrong database. */
370 offset
= find_any_idle_slot(sa
);
373 disconnectDatabase(sa
->slots
[offset
].connection
);
374 sa
->slots
[offset
].connection
= NULL
;
375 connect_slot(sa
, offset
, dbname
);
376 sa
->slots
[offset
].inUse
= true;
377 return &sa
->slots
[offset
];
381 * Fourth choice: block until one or more slots become available. If
382 * any slots hit a fatal error, we'll find out about that here and
385 if (!wait_on_slots(sa
))
392 * Prepare a set of parallel slots but do not connect to any database.
394 * This creates and initializes a set of slots, marking all parallel slots as
395 * free and ready to use. Establishing connections is delayed until requesting
396 * a free slot. The cparams, progname, echo, and initcmd are stored for later
397 * use and must remain valid for the lifetime of the returned array.
400 ParallelSlotsSetup(int numslots
, ConnParams
*cparams
, const char *progname
,
401 bool echo
, const char *initcmd
)
403 ParallelSlotArray
*sa
;
405 Assert(numslots
> 0);
406 Assert(cparams
!= NULL
);
407 Assert(progname
!= NULL
);
409 sa
= (ParallelSlotArray
*) palloc0(offsetof(ParallelSlotArray
, slots
) +
410 numslots
* sizeof(ParallelSlot
));
412 sa
->numslots
= numslots
;
413 sa
->cparams
= cparams
;
414 sa
->progname
= progname
;
416 sa
->initcmd
= initcmd
;
422 * ParallelSlotsAdoptConn
423 * Assign an open connection to the slots array for reuse.
425 * This turns over ownership of an open connection to a slots array. The
426 * caller should not further use or close the connection. All the connection's
427 * parameters (user, host, port, etc.) except possibly dbname should match
428 * those of the slots array's cparams, as given in ParallelSlotsSetup. If
429 * these parameters differ, subsequent behavior is undefined.
432 ParallelSlotsAdoptConn(ParallelSlotArray
*sa
, PGconn
*conn
)
436 offset
= find_unconnected_slot(sa
);
438 sa
->slots
[offset
].connection
= conn
;
440 disconnectDatabase(conn
);
444 * ParallelSlotsTerminate
445 * Clean up a set of parallel slots
447 * Iterate through all connections in a given set of ParallelSlots and
448 * terminate all connections.
451 ParallelSlotsTerminate(ParallelSlotArray
*sa
)
455 for (i
= 0; i
< sa
->numslots
; i
++)
457 PGconn
*conn
= sa
->slots
[i
].connection
;
462 disconnectDatabase(conn
);
467 * ParallelSlotsWaitCompletion
469 * Wait for all connections to finish, returning false if at least one
470 * error has been found on the way.
473 ParallelSlotsWaitCompletion(ParallelSlotArray
*sa
)
477 for (i
= 0; i
< sa
->numslots
; i
++)
479 if (sa
->slots
[i
].connection
== NULL
)
481 if (!consumeQueryResult(&sa
->slots
[i
]))
489 * TableCommandResultHandler
491 * ParallelSlotResultHandler for results of commands (not queries) against
494 * Requires that the result status is either PGRES_COMMAND_OK or an error about
495 * a missing table. This is useful for utilities that compile a list of tables
496 * to process and then run commands (vacuum, reindex, or whatever) against
497 * those tables, as there is a race condition between the time the list is
498 * compiled and the time the command attempts to open the table.
500 * For missing tables, logs an error but allows processing to continue.
502 * For all other errors, logs an error and terminates further processing.
504 * res: PGresult from the query executed on the slot's connection
505 * conn: connection belonging to the slot
509 TableCommandResultHandler(PGresult
*res
, PGconn
*conn
, void *context
)
512 Assert(conn
!= NULL
);
515 * If it's an error, report it. Errors about a missing table are harmless
516 * so we continue processing; but die for other errors.
518 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
520 char *sqlState
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
522 pg_log_error("processing of database \"%s\" failed: %s",
523 PQdb(conn
), PQerrorMessage(conn
));
525 if (sqlState
&& strcmp(sqlState
, ERRCODE_UNDEFINED_TABLE
) != 0)