Consistently use "superuser" instead of "super user"
[pgsql.git] / src / fe_utils / parallel_slot.c
blob69581157c29a2ca12f626ee20a75a54d4b181ed1
1 /*-------------------------------------------------------------------------
3 * parallel_slot.c
4 * Parallel support for front-end parallel database connections
7 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/fe_utils/parallel_slot.c
12 *-------------------------------------------------------------------------
15 #ifdef WIN32
16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
17 #endif
19 #include "postgres_fe.h"
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
25 #include "common/logging.h"
26 #include "fe_utils/cancel.h"
27 #include "fe_utils/parallel_slot.h"
28 #include "fe_utils/query_utils.h"
30 #define ERRCODE_UNDEFINED_TABLE "42P01"
32 static int select_loop(int maxFd, fd_set *workerset);
33 static bool processQueryResult(ParallelSlot *slot, PGresult *result);
36 * Process (and delete) a query result. Returns true if there's no problem,
37 * false otherwise. It's up to the handler to decide what constitutes a
38 * problem.
40 static bool
41 processQueryResult(ParallelSlot *slot, PGresult *result)
43 Assert(slot->handler != NULL);
45 /* On failure, the handler should return NULL after freeing the result */
46 if (!slot->handler(result, slot->connection, slot->handler_context))
47 return false;
49 /* Ok, we have to free it ourself */
50 PQclear(result);
51 return true;
55 * Consume all the results generated for the given connection until
56 * nothing remains. If at least one error is encountered, return false.
57 * Note that this will block if the connection is busy.
59 static bool
60 consumeQueryResult(ParallelSlot *slot)
62 bool ok = true;
63 PGresult *result;
65 SetCancelConn(slot->connection);
66 while ((result = PQgetResult(slot->connection)) != NULL)
68 if (!processQueryResult(slot, result))
69 ok = false;
71 ResetCancelConn();
72 return ok;
76 * Wait until a file descriptor from the given set becomes readable.
78 * Returns the number of ready descriptors, or -1 on failure (including
79 * getting a cancel request).
81 static int
82 select_loop(int maxFd, fd_set *workerset)
84 int i;
85 fd_set saveSet = *workerset;
87 if (CancelRequested)
88 return -1;
90 for (;;)
93 * On Windows, we need to check once in a while for cancel requests;
94 * on other platforms we rely on select() returning when interrupted.
96 struct timeval *tvp;
97 #ifdef WIN32
98 struct timeval tv = {0, 1000000};
100 tvp = &tv;
101 #else
102 tvp = NULL;
103 #endif
105 *workerset = saveSet;
106 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
108 #ifdef WIN32
109 if (i == SOCKET_ERROR)
111 i = -1;
113 if (WSAGetLastError() == WSAEINTR)
114 errno = EINTR;
116 #endif
118 if (i < 0 && errno == EINTR)
119 continue; /* ignore this */
120 if (i < 0 || CancelRequested)
121 return -1; /* but not this */
122 if (i == 0)
123 continue; /* timeout (Win32 only) */
124 break;
127 return i;
131 * Return the offset of a suitable idle slot, or -1 if none are available. If
132 * the given dbname is not null, only idle slots connected to the given
133 * database are considered suitable, otherwise all idle connected slots are
134 * considered suitable.
136 static int
137 find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
139 int i;
141 for (i = 0; i < sa->numslots; i++)
143 if (sa->slots[i].inUse)
144 continue;
146 if (sa->slots[i].connection == NULL)
147 continue;
149 if (dbname == NULL ||
150 strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
151 return i;
153 return -1;
157 * Return the offset of the first slot without a database connection, or -1 if
158 * all slots are connected.
160 static int
161 find_unconnected_slot(const ParallelSlotArray *sa)
163 int i;
165 for (i = 0; i < sa->numslots; i++)
167 if (sa->slots[i].inUse)
168 continue;
170 if (sa->slots[i].connection == NULL)
171 return i;
174 return -1;
178 * Return the offset of the first idle slot, or -1 if all slots are busy.
180 static int
181 find_any_idle_slot(const ParallelSlotArray *sa)
183 int i;
185 for (i = 0; i < sa->numslots; i++)
186 if (!sa->slots[i].inUse)
187 return i;
189 return -1;
193 * Wait for any slot's connection to have query results, consume the results,
194 * and update the slot's status as appropriate. Returns true on success,
195 * false on cancellation, on error, or if no slots are connected.
197 static bool
198 wait_on_slots(ParallelSlotArray *sa)
200 int i;
201 fd_set slotset;
202 int maxFd = 0;
203 PGconn *cancelconn = NULL;
205 /* We must reconstruct the fd_set for each call to select_loop */
206 FD_ZERO(&slotset);
208 for (i = 0; i < sa->numslots; i++)
210 int sock;
212 /* We shouldn't get here if we still have slots without connections */
213 Assert(sa->slots[i].connection != NULL);
215 sock = PQsocket(sa->slots[i].connection);
218 * We don't really expect any connections to lose their sockets after
219 * startup, but just in case, cope by ignoring them.
221 if (sock < 0)
222 continue;
224 /* Keep track of the first valid connection we see. */
225 if (cancelconn == NULL)
226 cancelconn = sa->slots[i].connection;
228 FD_SET(sock, &slotset);
229 if (sock > maxFd)
230 maxFd = sock;
234 * If we get this far with no valid connections, processing cannot
235 * continue.
237 if (cancelconn == NULL)
238 return false;
240 SetCancelConn(sa->slots->connection);
241 i = select_loop(maxFd, &slotset);
242 ResetCancelConn();
244 /* failure? */
245 if (i < 0)
246 return false;
248 for (i = 0; i < sa->numslots; i++)
250 int sock;
252 sock = PQsocket(sa->slots[i].connection);
254 if (sock >= 0 && FD_ISSET(sock, &slotset))
256 /* select() says input is available, so consume it */
257 PQconsumeInput(sa->slots[i].connection);
260 /* Collect result(s) as long as any are available */
261 while (!PQisBusy(sa->slots[i].connection))
263 PGresult *result = PQgetResult(sa->slots[i].connection);
265 if (result != NULL)
267 /* Handle and discard the command result */
268 if (!processQueryResult(&sa->slots[i], result))
269 return false;
271 else
273 /* This connection has become idle */
274 sa->slots[i].inUse = false;
275 ParallelSlotClearHandler(&sa->slots[i]);
276 break;
280 return true;
284 * Open a new database connection using the stored connection parameters and
285 * optionally a given dbname if not null, execute the stored initial command if
286 * any, and associate the new connection with the given slot.
288 static void
289 connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
291 const char *old_override;
292 ParallelSlot *slot = &sa->slots[slotno];
294 old_override = sa->cparams->override_dbname;
295 if (dbname)
296 sa->cparams->override_dbname = dbname;
297 slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
298 sa->cparams->override_dbname = old_override;
300 if (PQsocket(slot->connection) >= FD_SETSIZE)
302 pg_log_fatal("too many jobs for this platform");
303 exit(1);
306 /* Setup the connection using the supplied command, if any. */
307 if (sa->initcmd)
308 executeCommand(slot->connection, sa->initcmd, sa->echo);
312 * ParallelSlotsGetIdle
313 * Return a connection slot that is ready to execute a command.
315 * The slot returned is chosen as follows:
317 * If any idle slot already has an open connection, and if either dbname is
318 * null or the existing connection is to the given database, that slot will be
319 * returned allowing the connection to be reused.
321 * Otherwise, if any idle slot is not yet connected to any database, the slot
322 * will be returned with it's connection opened using the stored cparams and
323 * optionally the given dbname if not null.
325 * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
326 * after having it's connection disconnected and reconnected using the stored
327 * cparams and optionally the given dbname if not null.
329 * Otherwise, if any slots have connections that are busy, we loop on select()
330 * until one socket becomes available. When this happens, we read the whole
331 * set and mark as free all sockets that become available. We then select a
332 * slot using the same rules as above.
334 * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
336 * For any connection created, if the stored initcmd is not null, it will be
337 * executed as a command on the newly formed connection before the slot is
338 * returned.
340 * If an error occurs, NULL is returned.
342 ParallelSlot *
343 ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
345 int offset;
347 Assert(sa);
348 Assert(sa->numslots > 0);
350 while (1)
352 /* First choice: a slot already connected to the desired database. */
353 offset = find_matching_idle_slot(sa, dbname);
354 if (offset >= 0)
356 sa->slots[offset].inUse = true;
357 return &sa->slots[offset];
360 /* Second choice: a slot not connected to any database. */
361 offset = find_unconnected_slot(sa);
362 if (offset >= 0)
364 connect_slot(sa, offset, dbname);
365 sa->slots[offset].inUse = true;
366 return &sa->slots[offset];
369 /* Third choice: a slot connected to the wrong database. */
370 offset = find_any_idle_slot(sa);
371 if (offset >= 0)
373 disconnectDatabase(sa->slots[offset].connection);
374 sa->slots[offset].connection = NULL;
375 connect_slot(sa, offset, dbname);
376 sa->slots[offset].inUse = true;
377 return &sa->slots[offset];
381 * Fourth choice: block until one or more slots become available. If
382 * any slots hit a fatal error, we'll find out about that here and
383 * return NULL.
385 if (!wait_on_slots(sa))
386 return NULL;
391 * ParallelSlotsSetup
392 * Prepare a set of parallel slots but do not connect to any database.
394 * This creates and initializes a set of slots, marking all parallel slots as
395 * free and ready to use. Establishing connections is delayed until requesting
396 * a free slot. The cparams, progname, echo, and initcmd are stored for later
397 * use and must remain valid for the lifetime of the returned array.
399 ParallelSlotArray *
400 ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
401 bool echo, const char *initcmd)
403 ParallelSlotArray *sa;
405 Assert(numslots > 0);
406 Assert(cparams != NULL);
407 Assert(progname != NULL);
409 sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
410 numslots * sizeof(ParallelSlot));
412 sa->numslots = numslots;
413 sa->cparams = cparams;
414 sa->progname = progname;
415 sa->echo = echo;
416 sa->initcmd = initcmd;
418 return sa;
422 * ParallelSlotsAdoptConn
423 * Assign an open connection to the slots array for reuse.
425 * This turns over ownership of an open connection to a slots array. The
426 * caller should not further use or close the connection. All the connection's
427 * parameters (user, host, port, etc.) except possibly dbname should match
428 * those of the slots array's cparams, as given in ParallelSlotsSetup. If
429 * these parameters differ, subsequent behavior is undefined.
431 void
432 ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
434 int offset;
436 offset = find_unconnected_slot(sa);
437 if (offset >= 0)
438 sa->slots[offset].connection = conn;
439 else
440 disconnectDatabase(conn);
444 * ParallelSlotsTerminate
445 * Clean up a set of parallel slots
447 * Iterate through all connections in a given set of ParallelSlots and
448 * terminate all connections.
450 void
451 ParallelSlotsTerminate(ParallelSlotArray *sa)
453 int i;
455 for (i = 0; i < sa->numslots; i++)
457 PGconn *conn = sa->slots[i].connection;
459 if (conn == NULL)
460 continue;
462 disconnectDatabase(conn);
467 * ParallelSlotsWaitCompletion
469 * Wait for all connections to finish, returning false if at least one
470 * error has been found on the way.
472 bool
473 ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
475 int i;
477 for (i = 0; i < sa->numslots; i++)
479 if (sa->slots[i].connection == NULL)
480 continue;
481 if (!consumeQueryResult(&sa->slots[i]))
482 return false;
485 return true;
489 * TableCommandResultHandler
491 * ParallelSlotResultHandler for results of commands (not queries) against
492 * tables.
494 * Requires that the result status is either PGRES_COMMAND_OK or an error about
495 * a missing table. This is useful for utilities that compile a list of tables
496 * to process and then run commands (vacuum, reindex, or whatever) against
497 * those tables, as there is a race condition between the time the list is
498 * compiled and the time the command attempts to open the table.
500 * For missing tables, logs an error but allows processing to continue.
502 * For all other errors, logs an error and terminates further processing.
504 * res: PGresult from the query executed on the slot's connection
505 * conn: connection belonging to the slot
506 * context: unused
508 bool
509 TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
511 Assert(res != NULL);
512 Assert(conn != NULL);
515 * If it's an error, report it. Errors about a missing table are harmless
516 * so we continue processing; but die for other errors.
518 if (PQresultStatus(res) != PGRES_COMMAND_OK)
520 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
522 pg_log_error("processing of database \"%s\" failed: %s",
523 PQdb(conn), PQerrorMessage(conn));
525 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
527 PQclear(res);
528 return false;
532 return true;