Fix use-after-free in parallel_vacuum_reset_dead_items
[pgsql.git] / src / test / modules / libpq_pipeline / libpq_pipeline.c
blob1323e4c598d7dc676b3753b9a4e5353ca584fd3f
1 /*-------------------------------------------------------------------------
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
18 #include <sys/select.h>
19 #include <sys/time.h>
21 #include "catalog/pg_type_d.h"
22 #include "libpq-fe.h"
23 #include "pg_getopt.h"
26 static void exit_nicely(PGconn *conn);
27 static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
28 pg_attribute_printf(2, 3);
29 static bool process_result(PGconn *conn, PGresult *res, int results,
30 int numsent);
32 static const char *const progname = "libpq_pipeline";
34 /* Options and defaults */
35 static char *tracefile = NULL; /* path to PQtrace() file */
38 #ifdef DEBUG_OUTPUT
39 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40 #else
41 #define pg_debug(...)
42 #endif
44 static const char *const drop_table_sql =
45 "DROP TABLE IF EXISTS pq_pipeline_demo";
46 static const char *const create_table_sql =
47 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48 "int8filler int8);";
49 static const char *const insert_sql =
50 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 static const char *const insert_sql2 =
52 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
54 /* max char length of an int32/64, plus sign and null terminator */
55 #define MAXINTLEN 12
56 #define MAXINT8LEN 20
58 static void
59 exit_nicely(PGconn *conn)
61 PQfinish(conn);
62 exit(1);
66 * The following few functions are wrapped in macros to make the reported line
67 * number in an error match the line number of the invocation.
71 * Print an error to stderr and terminate the program.
73 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
74 static void
75 pg_attribute_noreturn()
76 pg_fatal_impl(int line, const char *fmt,...)
78 va_list args;
80 fflush(stdout);
82 fprintf(stderr, "\n%s:%d: ", progname, line);
83 va_start(args, fmt);
84 vfprintf(stderr, fmt, args);
85 va_end(args);
86 Assert(fmt[strlen(fmt) - 1] != '\n');
87 fprintf(stderr, "\n");
88 exit(1);
92 * Check that the query on the given connection got canceled.
94 #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
95 static void
96 confirm_query_canceled_impl(int line, PGconn *conn)
98 PGresult *res = NULL;
100 res = PQgetResult(conn);
101 if (res == NULL)
102 pg_fatal_impl(line, "PQgetResult returned null: %s",
103 PQerrorMessage(conn));
104 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
105 pg_fatal_impl(line, "query did not fail when it was expected");
106 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
107 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
108 PQerrorMessage(conn));
109 PQclear(res);
111 while (PQisBusy(conn))
112 PQconsumeInput(conn);
116 * Using monitorConn, query pg_stat_activity to see that the connection with
117 * the given PID is either in the given state, or waiting on the given event
118 * (only one of them can be given).
120 static void
121 wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
122 char *state, char *event)
124 const Oid paramTypes[] = {INT4OID, TEXTOID};
125 const char *paramValues[2];
126 char *pidstr = psprintf("%d", procpid);
128 Assert((state == NULL) ^ (event == NULL));
130 paramValues[0] = pidstr;
131 paramValues[1] = state ? state : event;
133 while (true)
135 PGresult *res;
136 char *value;
138 if (state != NULL)
139 res = PQexecParams(monitorConn,
140 "SELECT count(*) FROM pg_stat_activity WHERE "
141 "pid = $1 AND state = $2",
142 2, paramTypes, paramValues, NULL, NULL, 0);
143 else
144 res = PQexecParams(monitorConn,
145 "SELECT count(*) FROM pg_stat_activity WHERE "
146 "pid = $1 AND wait_event = $2",
147 2, paramTypes, paramValues, NULL, NULL, 0);
149 if (PQresultStatus(res) != PGRES_TUPLES_OK)
150 pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
151 if (PQntuples(res) != 1)
152 pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
153 if (PQnfields(res) != 1)
154 pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
155 value = PQgetvalue(res, 0, 0);
156 if (strcmp(value, "0") != 0)
158 PQclear(res);
159 break;
161 PQclear(res);
163 /* wait 10ms before polling again */
164 pg_usleep(10000);
167 pfree(pidstr);
170 #define send_cancellable_query(conn, monitorConn) \
171 send_cancellable_query_impl(__LINE__, conn, monitorConn)
172 static void
173 send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
175 const char *env_wait;
176 const Oid paramTypes[1] = {INT4OID};
179 * Wait for the connection to be idle, so that our check for an active
180 * connection below is reliable, instead of possibly seeing an outdated
181 * state.
183 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
185 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
186 if (env_wait == NULL)
187 env_wait = "180";
189 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
190 &env_wait, NULL, NULL, 0) != 1)
191 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
194 * Wait for the sleep to be active, because if the query is not running
195 * yet, the cancel request that we send won't have any effect.
197 wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
201 * Create a new connection with the same conninfo as the given one.
203 static PGconn *
204 copy_connection(PGconn *conn)
206 PGconn *copyConn;
207 PQconninfoOption *opts = PQconninfo(conn);
208 const char **keywords;
209 const char **vals;
210 int nopts = 1;
211 int i = 0;
213 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
214 nopts++;
216 keywords = pg_malloc(sizeof(char *) * nopts);
217 vals = pg_malloc(sizeof(char *) * nopts);
219 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
221 if (opt->val)
223 keywords[i] = opt->keyword;
224 vals[i] = opt->val;
225 i++;
228 keywords[i] = vals[i] = NULL;
230 copyConn = PQconnectdbParams(keywords, vals, false);
232 if (PQstatus(copyConn) != CONNECTION_OK)
233 pg_fatal("Connection to database failed: %s",
234 PQerrorMessage(copyConn));
236 return copyConn;
240 * Test query cancellation routines
242 static void
243 test_cancel(PGconn *conn)
245 PGcancel *cancel;
246 PGcancelConn *cancelConn;
247 PGconn *monitorConn;
248 char errorbuf[256];
250 fprintf(stderr, "test cancellations... ");
252 if (PQsetnonblocking(conn, 1) != 0)
253 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
256 * Make a separate connection to the database to monitor the query on the
257 * main connection.
259 monitorConn = copy_connection(conn);
260 Assert(PQstatus(monitorConn) == CONNECTION_OK);
262 /* test PQcancel */
263 send_cancellable_query(conn, monitorConn);
264 cancel = PQgetCancel(conn);
265 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
266 pg_fatal("failed to run PQcancel: %s", errorbuf);
267 confirm_query_canceled(conn);
269 /* PGcancel object can be reused for the next query */
270 send_cancellable_query(conn, monitorConn);
271 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
272 pg_fatal("failed to run PQcancel: %s", errorbuf);
273 confirm_query_canceled(conn);
275 PQfreeCancel(cancel);
277 /* test PQrequestCancel */
278 send_cancellable_query(conn, monitorConn);
279 if (!PQrequestCancel(conn))
280 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
281 confirm_query_canceled(conn);
283 /* test PQcancelBlocking */
284 send_cancellable_query(conn, monitorConn);
285 cancelConn = PQcancelCreate(conn);
286 if (!PQcancelBlocking(cancelConn))
287 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
288 confirm_query_canceled(conn);
289 PQcancelFinish(cancelConn);
291 /* test PQcancelCreate and then polling with PQcancelPoll */
292 send_cancellable_query(conn, monitorConn);
293 cancelConn = PQcancelCreate(conn);
294 if (!PQcancelStart(cancelConn))
295 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
296 while (true)
298 struct timeval tv;
299 fd_set input_mask;
300 fd_set output_mask;
301 PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
302 int sock = PQcancelSocket(cancelConn);
304 if (pollres == PGRES_POLLING_OK)
305 break;
307 FD_ZERO(&input_mask);
308 FD_ZERO(&output_mask);
309 switch (pollres)
311 case PGRES_POLLING_READING:
312 pg_debug("polling for reads\n");
313 FD_SET(sock, &input_mask);
314 break;
315 case PGRES_POLLING_WRITING:
316 pg_debug("polling for writes\n");
317 FD_SET(sock, &output_mask);
318 break;
319 default:
320 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
323 if (sock < 0)
324 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
326 tv.tv_sec = 3;
327 tv.tv_usec = 0;
329 while (true)
331 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
333 if (errno == EINTR)
334 continue;
335 pg_fatal("select() failed: %m");
337 break;
340 if (PQcancelStatus(cancelConn) != CONNECTION_OK)
341 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
342 confirm_query_canceled(conn);
345 * test PQcancelReset works on the cancel connection and it can be reused
346 * afterwards
348 PQcancelReset(cancelConn);
350 send_cancellable_query(conn, monitorConn);
351 if (!PQcancelStart(cancelConn))
352 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
353 while (true)
355 struct timeval tv;
356 fd_set input_mask;
357 fd_set output_mask;
358 PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
359 int sock = PQcancelSocket(cancelConn);
361 if (pollres == PGRES_POLLING_OK)
362 break;
364 FD_ZERO(&input_mask);
365 FD_ZERO(&output_mask);
366 switch (pollres)
368 case PGRES_POLLING_READING:
369 pg_debug("polling for reads\n");
370 FD_SET(sock, &input_mask);
371 break;
372 case PGRES_POLLING_WRITING:
373 pg_debug("polling for writes\n");
374 FD_SET(sock, &output_mask);
375 break;
376 default:
377 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
380 if (sock < 0)
381 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
383 tv.tv_sec = 3;
384 tv.tv_usec = 0;
386 while (true)
388 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
390 if (errno == EINTR)
391 continue;
392 pg_fatal("select() failed: %m");
394 break;
397 if (PQcancelStatus(cancelConn) != CONNECTION_OK)
398 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
399 confirm_query_canceled(conn);
401 PQcancelFinish(cancelConn);
403 fprintf(stderr, "ok\n");
406 static void
407 test_disallowed_in_pipeline(PGconn *conn)
409 PGresult *res = NULL;
411 fprintf(stderr, "test error cases... ");
413 if (PQisnonblocking(conn))
414 pg_fatal("Expected blocking connection mode");
416 if (PQenterPipelineMode(conn) != 1)
417 pg_fatal("Unable to enter pipeline mode");
419 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
420 pg_fatal("Pipeline mode not activated properly");
422 /* PQexec should fail in pipeline mode */
423 res = PQexec(conn, "SELECT 1");
424 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
425 pg_fatal("PQexec should fail in pipeline mode but succeeded");
426 if (strcmp(PQerrorMessage(conn),
427 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
428 pg_fatal("did not get expected error message; got: \"%s\"",
429 PQerrorMessage(conn));
431 /* PQsendQuery should fail in pipeline mode */
432 if (PQsendQuery(conn, "SELECT 1") != 0)
433 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
434 if (strcmp(PQerrorMessage(conn),
435 "PQsendQuery not allowed in pipeline mode\n") != 0)
436 pg_fatal("did not get expected error message; got: \"%s\"",
437 PQerrorMessage(conn));
439 /* Entering pipeline mode when already in pipeline mode is OK */
440 if (PQenterPipelineMode(conn) != 1)
441 pg_fatal("re-entering pipeline mode should be a no-op but failed");
443 if (PQisBusy(conn) != 0)
444 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
446 /* ok, back to normal command mode */
447 if (PQexitPipelineMode(conn) != 1)
448 pg_fatal("couldn't exit idle empty pipeline mode");
450 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
451 pg_fatal("Pipeline mode not terminated properly");
453 /* exiting pipeline mode when not in pipeline mode should be a no-op */
454 if (PQexitPipelineMode(conn) != 1)
455 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
457 /* can now PQexec again */
458 res = PQexec(conn, "SELECT 1");
459 if (PQresultStatus(res) != PGRES_TUPLES_OK)
460 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
461 PQerrorMessage(conn));
463 fprintf(stderr, "ok\n");
466 static void
467 test_multi_pipelines(PGconn *conn)
469 PGresult *res = NULL;
470 const char *dummy_params[1] = {"1"};
471 Oid dummy_param_oids[1] = {INT4OID};
473 fprintf(stderr, "multi pipeline... ");
476 * Queue up a couple of small pipelines and process each without returning
477 * to command mode first.
479 if (PQenterPipelineMode(conn) != 1)
480 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
482 /* first pipeline */
483 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
484 dummy_params, NULL, NULL, 0) != 1)
485 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
487 if (PQpipelineSync(conn) != 1)
488 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
490 /* second pipeline */
491 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
492 dummy_params, NULL, NULL, 0) != 1)
493 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
495 /* Skip flushing once. */
496 if (PQsendPipelineSync(conn) != 1)
497 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
499 /* third pipeline */
500 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
501 dummy_params, NULL, NULL, 0) != 1)
502 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
504 if (PQpipelineSync(conn) != 1)
505 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
507 /* OK, start processing the results */
509 /* first pipeline */
511 res = PQgetResult(conn);
512 if (res == NULL)
513 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
514 PQerrorMessage(conn));
516 if (PQresultStatus(res) != PGRES_TUPLES_OK)
517 pg_fatal("Unexpected result code %s from first pipeline item",
518 PQresStatus(PQresultStatus(res)));
519 PQclear(res);
520 res = NULL;
522 if (PQgetResult(conn) != NULL)
523 pg_fatal("PQgetResult returned something extra after first result");
525 if (PQexitPipelineMode(conn) != 0)
526 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
528 res = PQgetResult(conn);
529 if (res == NULL)
530 pg_fatal("PQgetResult returned null when sync result expected: %s",
531 PQerrorMessage(conn));
533 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
534 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
535 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
536 PQclear(res);
538 /* second pipeline */
540 res = PQgetResult(conn);
541 if (res == NULL)
542 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
543 PQerrorMessage(conn));
545 if (PQresultStatus(res) != PGRES_TUPLES_OK)
546 pg_fatal("Unexpected result code %s from second pipeline item",
547 PQresStatus(PQresultStatus(res)));
548 PQclear(res);
549 res = NULL;
551 if (PQgetResult(conn) != NULL)
552 pg_fatal("PQgetResult returned something extra after first result");
554 if (PQexitPipelineMode(conn) != 0)
555 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
557 res = PQgetResult(conn);
558 if (res == NULL)
559 pg_fatal("PQgetResult returned null when sync result expected: %s",
560 PQerrorMessage(conn));
562 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
563 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
564 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
565 PQclear(res);
567 /* third pipeline */
569 res = PQgetResult(conn);
570 if (res == NULL)
571 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
572 PQerrorMessage(conn));
574 if (PQresultStatus(res) != PGRES_TUPLES_OK)
575 pg_fatal("Unexpected result code %s from third pipeline item",
576 PQresStatus(PQresultStatus(res)));
578 res = PQgetResult(conn);
579 if (res != NULL)
580 pg_fatal("Expected null result, got %s",
581 PQresStatus(PQresultStatus(res)));
583 res = PQgetResult(conn);
584 if (res == NULL)
585 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
586 PQerrorMessage(conn));
588 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
589 pg_fatal("Unexpected result code %s from second pipeline sync",
590 PQresStatus(PQresultStatus(res)));
592 /* We're still in pipeline mode ... */
593 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
594 pg_fatal("Fell out of pipeline mode somehow");
596 /* until we end it, which we can safely do now */
597 if (PQexitPipelineMode(conn) != 1)
598 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
599 PQerrorMessage(conn));
601 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
602 pg_fatal("exiting pipeline mode didn't seem to work");
604 fprintf(stderr, "ok\n");
608 * Test behavior when a pipeline dispatches a number of commands that are
609 * not flushed by a sync point.
611 static void
612 test_nosync(PGconn *conn)
614 int numqueries = 10;
615 int results = 0;
616 int sock = PQsocket(conn);
618 fprintf(stderr, "nosync... ");
620 if (sock < 0)
621 pg_fatal("invalid socket");
623 if (PQenterPipelineMode(conn) != 1)
624 pg_fatal("could not enter pipeline mode");
625 for (int i = 0; i < numqueries; i++)
627 fd_set input_mask;
628 struct timeval tv;
630 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
631 0, NULL, NULL, NULL, NULL, 0) != 1)
632 pg_fatal("error sending select: %s", PQerrorMessage(conn));
633 PQflush(conn);
636 * If the server has written anything to us, read (some of) it now.
638 FD_ZERO(&input_mask);
639 FD_SET(sock, &input_mask);
640 tv.tv_sec = 0;
641 tv.tv_usec = 0;
642 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
644 fprintf(stderr, "select() failed: %m\n");
645 exit_nicely(conn);
647 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
648 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
651 /* tell server to flush its output buffer */
652 if (PQsendFlushRequest(conn) != 1)
653 pg_fatal("failed to send flush request");
654 PQflush(conn);
656 /* Now read all results */
657 for (;;)
659 PGresult *res;
661 res = PQgetResult(conn);
663 /* NULL results are only expected after TUPLES_OK */
664 if (res == NULL)
665 pg_fatal("got unexpected NULL result after %d results", results);
667 /* We expect exactly one TUPLES_OK result for each query we sent */
668 if (PQresultStatus(res) == PGRES_TUPLES_OK)
670 PGresult *res2;
672 /* and one NULL result should follow each */
673 res2 = PQgetResult(conn);
674 if (res2 != NULL)
675 pg_fatal("expected NULL, got %s",
676 PQresStatus(PQresultStatus(res2)));
677 PQclear(res);
678 results++;
680 /* if we're done, we're done */
681 if (results == numqueries)
682 break;
684 continue;
687 /* anything else is unexpected */
688 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
691 fprintf(stderr, "ok\n");
695 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
696 * still have to get results for each pipeline item, but the item will just be
697 * a PGRES_PIPELINE_ABORTED code.
699 * This intentionally doesn't use a transaction to wrap the pipeline. You should
700 * usually use an xact, but in this case we want to observe the effects of each
701 * statement.
703 static void
704 test_pipeline_abort(PGconn *conn)
706 PGresult *res = NULL;
707 const char *dummy_params[1] = {"1"};
708 Oid dummy_param_oids[1] = {INT4OID};
709 int i;
710 int gotrows;
711 bool goterror;
713 fprintf(stderr, "aborted pipeline... ");
715 res = PQexec(conn, drop_table_sql);
716 if (PQresultStatus(res) != PGRES_COMMAND_OK)
717 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
719 res = PQexec(conn, create_table_sql);
720 if (PQresultStatus(res) != PGRES_COMMAND_OK)
721 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
724 * Queue up a couple of small pipelines and process each without returning
725 * to command mode first. Make sure the second operation in the first
726 * pipeline ERRORs.
728 if (PQenterPipelineMode(conn) != 1)
729 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
731 dummy_params[0] = "1";
732 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
733 dummy_params, NULL, NULL, 0) != 1)
734 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
736 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
737 1, dummy_param_oids, dummy_params,
738 NULL, NULL, 0) != 1)
739 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
741 dummy_params[0] = "2";
742 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
743 dummy_params, NULL, NULL, 0) != 1)
744 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
746 if (PQpipelineSync(conn) != 1)
747 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
749 dummy_params[0] = "3";
750 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
751 dummy_params, NULL, NULL, 0) != 1)
752 pg_fatal("dispatching second-pipeline insert failed: %s",
753 PQerrorMessage(conn));
755 if (PQpipelineSync(conn) != 1)
756 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
759 * OK, start processing the pipeline results.
761 * We should get a command-ok for the first query, then a fatal error and
762 * a pipeline aborted message for the second insert, a pipeline-end, then
763 * a command-ok and a pipeline-ok for the second pipeline operation.
765 res = PQgetResult(conn);
766 if (res == NULL)
767 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
768 if (PQresultStatus(res) != PGRES_COMMAND_OK)
769 pg_fatal("Unexpected result status %s: %s",
770 PQresStatus(PQresultStatus(res)),
771 PQresultErrorMessage(res));
772 PQclear(res);
774 /* NULL result to signal end-of-results for this command */
775 if ((res = PQgetResult(conn)) != NULL)
776 pg_fatal("Expected null result, got %s",
777 PQresStatus(PQresultStatus(res)));
779 /* Second query caused error, so we expect an error next */
780 res = PQgetResult(conn);
781 if (res == NULL)
782 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
783 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
784 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
785 PQresStatus(PQresultStatus(res)));
786 PQclear(res);
788 /* NULL result to signal end-of-results for this command */
789 if ((res = PQgetResult(conn)) != NULL)
790 pg_fatal("Expected null result, got %s",
791 PQresStatus(PQresultStatus(res)));
794 * pipeline should now be aborted.
796 * Note that we could still queue more queries at this point if we wanted;
797 * they'd get added to a new third pipeline since we've already sent a
798 * second. The aborted flag relates only to the pipeline being received.
800 if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
801 pg_fatal("pipeline should be flagged as aborted but isn't");
803 /* third query in pipeline, the second insert */
804 res = PQgetResult(conn);
805 if (res == NULL)
806 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
807 if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
808 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
809 PQresStatus(PQresultStatus(res)));
810 PQclear(res);
812 /* NULL result to signal end-of-results for this command */
813 if ((res = PQgetResult(conn)) != NULL)
814 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
816 if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
817 pg_fatal("pipeline should be flagged as aborted but isn't");
819 /* Ensure we're still in pipeline */
820 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
821 pg_fatal("Fell out of pipeline mode somehow");
824 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
826 * (This is so clients know to start processing results normally again and
827 * can tell the difference between skipped commands and the sync.)
829 res = PQgetResult(conn);
830 if (res == NULL)
831 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
832 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
833 pg_fatal("Unexpected result code from first pipeline sync\n"
834 "Expected PGRES_PIPELINE_SYNC, got %s",
835 PQresStatus(PQresultStatus(res)));
836 PQclear(res);
838 if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
839 pg_fatal("sync should've cleared the aborted flag but didn't");
841 /* We're still in pipeline mode... */
842 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
843 pg_fatal("Fell out of pipeline mode somehow");
845 /* the insert from the second pipeline */
846 res = PQgetResult(conn);
847 if (res == NULL)
848 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
849 if (PQresultStatus(res) != PGRES_COMMAND_OK)
850 pg_fatal("Unexpected result code %s from first item in second pipeline",
851 PQresStatus(PQresultStatus(res)));
852 PQclear(res);
854 /* Read the NULL result at the end of the command */
855 if ((res = PQgetResult(conn)) != NULL)
856 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
858 /* the second pipeline sync */
859 if ((res = PQgetResult(conn)) == NULL)
860 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
861 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
862 pg_fatal("Unexpected result code %s from second pipeline sync",
863 PQresStatus(PQresultStatus(res)));
864 PQclear(res);
866 if ((res = PQgetResult(conn)) != NULL)
867 pg_fatal("Expected null result, got %s: %s",
868 PQresStatus(PQresultStatus(res)),
869 PQerrorMessage(conn));
871 /* Try to send two queries in one command */
872 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
873 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
874 if (PQpipelineSync(conn) != 1)
875 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
876 goterror = false;
877 while ((res = PQgetResult(conn)) != NULL)
879 switch (PQresultStatus(res))
881 case PGRES_FATAL_ERROR:
882 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
883 pg_fatal("expected error about multiple commands, got %s",
884 PQerrorMessage(conn));
885 printf("got expected %s", PQerrorMessage(conn));
886 goterror = true;
887 break;
888 default:
889 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
890 break;
893 if (!goterror)
894 pg_fatal("did not get cannot-insert-multiple-commands error");
895 res = PQgetResult(conn);
896 if (res == NULL)
897 pg_fatal("got NULL result");
898 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
899 pg_fatal("Unexpected result code %s from pipeline sync",
900 PQresStatus(PQresultStatus(res)));
901 fprintf(stderr, "ok\n");
903 /* Test single-row mode with an error partways */
904 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
905 0, NULL, NULL, NULL, NULL, 0) != 1)
906 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
907 if (PQpipelineSync(conn) != 1)
908 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
909 PQsetSingleRowMode(conn);
910 goterror = false;
911 gotrows = 0;
912 while ((res = PQgetResult(conn)) != NULL)
914 switch (PQresultStatus(res))
916 case PGRES_SINGLE_TUPLE:
917 printf("got row: %s\n", PQgetvalue(res, 0, 0));
918 gotrows++;
919 break;
920 case PGRES_FATAL_ERROR:
921 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
922 pg_fatal("expected division-by-zero, got: %s (%s)",
923 PQerrorMessage(conn),
924 PQresultErrorField(res, PG_DIAG_SQLSTATE));
925 printf("got expected division-by-zero\n");
926 goterror = true;
927 break;
928 default:
929 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
931 PQclear(res);
933 if (!goterror)
934 pg_fatal("did not get division-by-zero error");
935 if (gotrows != 3)
936 pg_fatal("did not get three rows");
937 /* the third pipeline sync */
938 if ((res = PQgetResult(conn)) == NULL)
939 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
940 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
941 pg_fatal("Unexpected result code %s from third pipeline sync",
942 PQresStatus(PQresultStatus(res)));
943 PQclear(res);
945 /* We're still in pipeline mode... */
946 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
947 pg_fatal("Fell out of pipeline mode somehow");
949 /* until we end it, which we can safely do now */
950 if (PQexitPipelineMode(conn) != 1)
951 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
952 PQerrorMessage(conn));
954 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
955 pg_fatal("exiting pipeline mode didn't seem to work");
958 * Since we fired the pipelines off without a surrounding xact, the results
959 * should be:
961 * - Implicit xact started by server around 1st pipeline
962 * - First insert applied
963 * - Second statement aborted xact
964 * - Third insert skipped
965 * - Sync rolled back first implicit xact
966 * - Implicit xact created by server around 2nd pipeline
967 * - insert applied from 2nd pipeline
968 * - Sync commits 2nd xact
970 * So we should only have the value 3 that we inserted.
972 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
974 if (PQresultStatus(res) != PGRES_TUPLES_OK)
975 pg_fatal("Expected tuples, got %s: %s",
976 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
977 if (PQntuples(res) != 1)
978 pg_fatal("expected 1 result, got %d", PQntuples(res));
979 for (i = 0; i < PQntuples(res); i++)
981 const char *val = PQgetvalue(res, i, 0);
983 if (strcmp(val, "3") != 0)
984 pg_fatal("expected only insert with value 3, got %s", val);
987 PQclear(res);
989 fprintf(stderr, "ok\n");
992 /* State machine enum for test_pipelined_insert */
993 enum PipelineInsertStep
995 BI_BEGIN_TX,
996 BI_DROP_TABLE,
997 BI_CREATE_TABLE,
998 BI_PREPARE,
999 BI_INSERT_ROWS,
1000 BI_COMMIT_TX,
1001 BI_SYNC,
1002 BI_DONE,
1005 static void
1006 test_pipelined_insert(PGconn *conn, int n_rows)
1008 Oid insert_param_oids[2] = {INT4OID, INT8OID};
1009 const char *insert_params[2];
1010 char insert_param_0[MAXINTLEN];
1011 char insert_param_1[MAXINT8LEN];
1012 enum PipelineInsertStep send_step = BI_BEGIN_TX,
1013 recv_step = BI_BEGIN_TX;
1014 int rows_to_send,
1015 rows_to_receive;
1017 insert_params[0] = insert_param_0;
1018 insert_params[1] = insert_param_1;
1020 rows_to_send = rows_to_receive = n_rows;
1023 * Do a pipelined insert into a table created at the start of the pipeline
1025 if (PQenterPipelineMode(conn) != 1)
1026 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1028 while (send_step != BI_PREPARE)
1030 const char *sql;
1032 switch (send_step)
1034 case BI_BEGIN_TX:
1035 sql = "BEGIN TRANSACTION";
1036 send_step = BI_DROP_TABLE;
1037 break;
1039 case BI_DROP_TABLE:
1040 sql = drop_table_sql;
1041 send_step = BI_CREATE_TABLE;
1042 break;
1044 case BI_CREATE_TABLE:
1045 sql = create_table_sql;
1046 send_step = BI_PREPARE;
1047 break;
1049 default:
1050 pg_fatal("invalid state");
1051 sql = NULL; /* keep compiler quiet */
1054 pg_debug("sending: %s\n", sql);
1055 if (PQsendQueryParams(conn, sql,
1056 0, NULL, NULL, NULL, NULL, 0) != 1)
1057 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1060 Assert(send_step == BI_PREPARE);
1061 pg_debug("sending: %s\n", insert_sql2);
1062 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1063 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1064 send_step = BI_INSERT_ROWS;
1067 * Now we start inserting. We'll be sending enough data that we could fill
1068 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1069 * mode and consume input while we send more output. As results of each
1070 * query are processed we should pop them to allow processing of the next
1071 * query. There's no need to finish the pipeline before processing
1072 * results.
1074 if (PQsetnonblocking(conn, 1) != 0)
1075 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1077 while (recv_step != BI_DONE)
1079 int sock;
1080 fd_set input_mask;
1081 fd_set output_mask;
1083 sock = PQsocket(conn);
1085 if (sock < 0)
1086 break; /* shouldn't happen */
1088 FD_ZERO(&input_mask);
1089 FD_SET(sock, &input_mask);
1090 FD_ZERO(&output_mask);
1091 FD_SET(sock, &output_mask);
1093 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1095 fprintf(stderr, "select() failed: %m\n");
1096 exit_nicely(conn);
1100 * Process any results, so we keep the server's output buffer free
1101 * flowing and it can continue to process input
1103 if (FD_ISSET(sock, &input_mask))
1105 PQconsumeInput(conn);
1107 /* Read until we'd block if we tried to read */
1108 while (!PQisBusy(conn) && recv_step < BI_DONE)
1110 PGresult *res;
1111 const char *cmdtag = "";
1112 const char *description = "";
1113 int status;
1116 * Read next result. If no more results from this query,
1117 * advance to the next query
1119 res = PQgetResult(conn);
1120 if (res == NULL)
1121 continue;
1123 status = PGRES_COMMAND_OK;
1124 switch (recv_step)
1126 case BI_BEGIN_TX:
1127 cmdtag = "BEGIN";
1128 recv_step++;
1129 break;
1130 case BI_DROP_TABLE:
1131 cmdtag = "DROP TABLE";
1132 recv_step++;
1133 break;
1134 case BI_CREATE_TABLE:
1135 cmdtag = "CREATE TABLE";
1136 recv_step++;
1137 break;
1138 case BI_PREPARE:
1139 cmdtag = "";
1140 description = "PREPARE";
1141 recv_step++;
1142 break;
1143 case BI_INSERT_ROWS:
1144 cmdtag = "INSERT";
1145 rows_to_receive--;
1146 if (rows_to_receive == 0)
1147 recv_step++;
1148 break;
1149 case BI_COMMIT_TX:
1150 cmdtag = "COMMIT";
1151 recv_step++;
1152 break;
1153 case BI_SYNC:
1154 cmdtag = "";
1155 description = "SYNC";
1156 status = PGRES_PIPELINE_SYNC;
1157 recv_step++;
1158 break;
1159 case BI_DONE:
1160 /* unreachable */
1161 pg_fatal("unreachable state");
1164 if (PQresultStatus(res) != status)
1165 pg_fatal("%s reported status %s, expected %s\n"
1166 "Error message: \"%s\"",
1167 description, PQresStatus(PQresultStatus(res)),
1168 PQresStatus(status), PQerrorMessage(conn));
1170 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1171 pg_fatal("%s expected command tag '%s', got '%s'",
1172 description, cmdtag, PQcmdStatus(res));
1174 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1176 PQclear(res);
1180 /* Write more rows and/or the end pipeline message, if needed */
1181 if (FD_ISSET(sock, &output_mask))
1183 PQflush(conn);
1185 if (send_step == BI_INSERT_ROWS)
1187 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1188 /* use up some buffer space with a wide value */
1189 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1191 if (PQsendQueryPrepared(conn, "my_insert",
1192 2, insert_params, NULL, NULL, 0) == 1)
1194 pg_debug("sent row %d\n", rows_to_send);
1196 rows_to_send--;
1197 if (rows_to_send == 0)
1198 send_step++;
1200 else
1203 * in nonblocking mode, so it's OK for an insert to fail
1204 * to send
1206 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1207 rows_to_send, PQerrorMessage(conn));
1210 else if (send_step == BI_COMMIT_TX)
1212 if (PQsendQueryParams(conn, "COMMIT",
1213 0, NULL, NULL, NULL, NULL, 0) == 1)
1215 pg_debug("sent COMMIT\n");
1216 send_step++;
1218 else
1220 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1221 PQerrorMessage(conn));
1224 else if (send_step == BI_SYNC)
1226 if (PQpipelineSync(conn) == 1)
1228 fprintf(stdout, "pipeline sync sent\n");
1229 send_step++;
1231 else
1233 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1234 PQerrorMessage(conn));
1240 /* We've got the sync message and the pipeline should be done */
1241 if (PQexitPipelineMode(conn) != 1)
1242 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1243 PQerrorMessage(conn));
1245 if (PQsetnonblocking(conn, 0) != 0)
1246 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1248 fprintf(stderr, "ok\n");
1251 static void
1252 test_prepared(PGconn *conn)
1254 PGresult *res = NULL;
1255 Oid param_oids[1] = {INT4OID};
1256 Oid expected_oids[4];
1257 Oid typ;
1259 fprintf(stderr, "prepared... ");
1261 if (PQenterPipelineMode(conn) != 1)
1262 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1263 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1264 "interval '1 sec'",
1265 1, param_oids) != 1)
1266 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1267 expected_oids[0] = INT4OID;
1268 expected_oids[1] = TEXTOID;
1269 expected_oids[2] = NUMERICOID;
1270 expected_oids[3] = INTERVALOID;
1271 if (PQsendDescribePrepared(conn, "select_one") != 1)
1272 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1273 if (PQpipelineSync(conn) != 1)
1274 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1276 res = PQgetResult(conn);
1277 if (res == NULL)
1278 pg_fatal("PQgetResult returned null");
1279 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1280 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1281 PQclear(res);
1282 res = PQgetResult(conn);
1283 if (res != NULL)
1284 pg_fatal("expected NULL result");
1286 res = PQgetResult(conn);
1287 if (res == NULL)
1288 pg_fatal("PQgetResult returned NULL");
1289 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1290 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1291 if (PQnfields(res) != lengthof(expected_oids))
1292 pg_fatal("expected %zu columns, got %d",
1293 lengthof(expected_oids), PQnfields(res));
1294 for (int i = 0; i < PQnfields(res); i++)
1296 typ = PQftype(res, i);
1297 if (typ != expected_oids[i])
1298 pg_fatal("field %d: expected type %u, got %u",
1299 i, expected_oids[i], typ);
1301 PQclear(res);
1302 res = PQgetResult(conn);
1303 if (res != NULL)
1304 pg_fatal("expected NULL result");
1306 res = PQgetResult(conn);
1307 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1308 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1310 fprintf(stderr, "closing statement..");
1311 if (PQsendClosePrepared(conn, "select_one") != 1)
1312 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1313 if (PQpipelineSync(conn) != 1)
1314 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1316 res = PQgetResult(conn);
1317 if (res == NULL)
1318 pg_fatal("expected non-NULL result");
1319 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1320 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1321 PQclear(res);
1322 res = PQgetResult(conn);
1323 if (res != NULL)
1324 pg_fatal("expected NULL result");
1325 res = PQgetResult(conn);
1326 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1327 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1329 if (PQexitPipelineMode(conn) != 1)
1330 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1332 /* Now that it's closed we should get an error when describing */
1333 res = PQdescribePrepared(conn, "select_one");
1334 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1335 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1338 * Also test the blocking close, this should not fail since closing a
1339 * non-existent prepared statement is a no-op
1341 res = PQclosePrepared(conn, "select_one");
1342 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1343 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1345 fprintf(stderr, "creating portal... ");
1346 PQexec(conn, "BEGIN");
1347 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1348 PQenterPipelineMode(conn);
1349 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1350 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1351 if (PQpipelineSync(conn) != 1)
1352 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1353 res = PQgetResult(conn);
1354 if (res == NULL)
1355 pg_fatal("PQgetResult returned null");
1356 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1357 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1359 typ = PQftype(res, 0);
1360 if (typ != INT4OID)
1361 pg_fatal("portal: expected type %u, got %u",
1362 INT4OID, typ);
1363 PQclear(res);
1364 res = PQgetResult(conn);
1365 if (res != NULL)
1366 pg_fatal("expected NULL result");
1367 res = PQgetResult(conn);
1368 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1369 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1371 fprintf(stderr, "closing portal... ");
1372 if (PQsendClosePortal(conn, "cursor_one") != 1)
1373 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1374 if (PQpipelineSync(conn) != 1)
1375 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1377 res = PQgetResult(conn);
1378 if (res == NULL)
1379 pg_fatal("expected non-NULL result");
1380 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1381 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1382 PQclear(res);
1383 res = PQgetResult(conn);
1384 if (res != NULL)
1385 pg_fatal("expected NULL result");
1386 res = PQgetResult(conn);
1387 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1388 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1390 if (PQexitPipelineMode(conn) != 1)
1391 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1393 /* Now that it's closed we should get an error when describing */
1394 res = PQdescribePortal(conn, "cursor_one");
1395 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1396 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1399 * Also test the blocking close, this should not fail since closing a
1400 * non-existent portal is a no-op
1402 res = PQclosePortal(conn, "cursor_one");
1403 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1404 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1406 fprintf(stderr, "ok\n");
1409 /* Notice processor: print notices, and count how many we got */
1410 static void
1411 notice_processor(void *arg, const char *message)
1413 int *n_notices = (int *) arg;
1415 (*n_notices)++;
1416 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1419 /* Verify behavior in "idle" state */
1420 static void
1421 test_pipeline_idle(PGconn *conn)
1423 PGresult *res;
1424 int n_notices = 0;
1426 fprintf(stderr, "\npipeline idle...\n");
1428 PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1430 /* Try to exit pipeline mode in pipeline-idle state */
1431 if (PQenterPipelineMode(conn) != 1)
1432 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1433 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1434 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1435 PQsendFlushRequest(conn);
1436 res = PQgetResult(conn);
1437 if (res == NULL)
1438 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1439 PQerrorMessage(conn));
1440 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1441 pg_fatal("unexpected result code %s from first pipeline item",
1442 PQresStatus(PQresultStatus(res)));
1443 PQclear(res);
1444 res = PQgetResult(conn);
1445 if (res != NULL)
1446 pg_fatal("did not receive terminating NULL");
1447 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1448 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1449 if (PQexitPipelineMode(conn) == 1)
1450 pg_fatal("exiting pipeline succeeded when it shouldn't");
1451 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1452 strlen("cannot exit pipeline mode")) != 0)
1453 pg_fatal("did not get expected error; got: %s",
1454 PQerrorMessage(conn));
1455 PQsendFlushRequest(conn);
1456 res = PQgetResult(conn);
1457 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1458 pg_fatal("unexpected result code %s from second pipeline item",
1459 PQresStatus(PQresultStatus(res)));
1460 PQclear(res);
1461 res = PQgetResult(conn);
1462 if (res != NULL)
1463 pg_fatal("did not receive terminating NULL");
1464 if (PQexitPipelineMode(conn) != 1)
1465 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1467 if (n_notices > 0)
1468 pg_fatal("got %d notice(s)", n_notices);
1469 fprintf(stderr, "ok - 1\n");
1471 /* Have a WARNING in the middle of a resultset */
1472 if (PQenterPipelineMode(conn) != 1)
1473 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1474 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1475 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1476 PQsendFlushRequest(conn);
1477 res = PQgetResult(conn);
1478 if (res == NULL)
1479 pg_fatal("unexpected NULL result received");
1480 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1481 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1482 if (PQexitPipelineMode(conn) != 1)
1483 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1484 fprintf(stderr, "ok - 2\n");
1487 static void
1488 test_simple_pipeline(PGconn *conn)
1490 PGresult *res = NULL;
1491 const char *dummy_params[1] = {"1"};
1492 Oid dummy_param_oids[1] = {INT4OID};
1494 fprintf(stderr, "simple pipeline... ");
1497 * Enter pipeline mode and dispatch a set of operations, which we'll then
1498 * process the results of as they come in.
1500 * For a simple case we should be able to do this without interim
1501 * processing of results since our output buffer will give us enough slush
1502 * to work with and we won't block on sending. So blocking mode is fine.
1504 if (PQisnonblocking(conn))
1505 pg_fatal("Expected blocking connection mode");
1507 if (PQenterPipelineMode(conn) != 1)
1508 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1510 if (PQsendQueryParams(conn, "SELECT $1",
1511 1, dummy_param_oids, dummy_params,
1512 NULL, NULL, 0) != 1)
1513 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1515 if (PQexitPipelineMode(conn) != 0)
1516 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1518 if (PQpipelineSync(conn) != 1)
1519 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1521 res = PQgetResult(conn);
1522 if (res == NULL)
1523 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1524 PQerrorMessage(conn));
1526 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1527 pg_fatal("Unexpected result code %s from first pipeline item",
1528 PQresStatus(PQresultStatus(res)));
1530 PQclear(res);
1531 res = NULL;
1533 if (PQgetResult(conn) != NULL)
1534 pg_fatal("PQgetResult returned something extra after first query result.");
1537 * Even though we've processed the result there's still a sync to come and
1538 * we can't exit pipeline mode yet
1540 if (PQexitPipelineMode(conn) != 0)
1541 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1543 res = PQgetResult(conn);
1544 if (res == NULL)
1545 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1546 PQerrorMessage(conn));
1548 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1549 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1550 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1552 PQclear(res);
1553 res = NULL;
1555 if (PQgetResult(conn) != NULL)
1556 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1557 PQresStatus(PQresultStatus(res)));
1559 /* We're still in pipeline mode... */
1560 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1561 pg_fatal("Fell out of pipeline mode somehow");
1563 /* ... until we end it, which we can safely do now */
1564 if (PQexitPipelineMode(conn) != 1)
1565 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1566 PQerrorMessage(conn));
1568 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1569 pg_fatal("Exiting pipeline mode didn't seem to work");
1571 fprintf(stderr, "ok\n");
1574 static void
1575 test_singlerowmode(PGconn *conn)
1577 PGresult *res;
1578 int i;
1579 bool pipeline_ended = false;
1581 if (PQenterPipelineMode(conn) != 1)
1582 pg_fatal("failed to enter pipeline mode: %s",
1583 PQerrorMessage(conn));
1585 /* One series of three commands, using single-row mode for the first two. */
1586 for (i = 0; i < 3; i++)
1588 char *param[1];
1590 param[0] = psprintf("%d", 44 + i);
1592 if (PQsendQueryParams(conn,
1593 "SELECT generate_series(42, $1)",
1595 NULL,
1596 (const char **) param,
1597 NULL,
1598 NULL,
1599 0) != 1)
1600 pg_fatal("failed to send query: %s",
1601 PQerrorMessage(conn));
1602 pfree(param[0]);
1604 if (PQpipelineSync(conn) != 1)
1605 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1607 for (i = 0; !pipeline_ended; i++)
1609 bool first = true;
1610 bool saw_ending_tuplesok;
1611 bool isSingleTuple = false;
1613 /* Set single row mode for only first 2 SELECT queries */
1614 if (i < 2)
1616 if (PQsetSingleRowMode(conn) != 1)
1617 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1620 /* Consume rows for this query */
1621 saw_ending_tuplesok = false;
1622 while ((res = PQgetResult(conn)) != NULL)
1624 ExecStatusType est = PQresultStatus(res);
1626 if (est == PGRES_PIPELINE_SYNC)
1628 fprintf(stderr, "end of pipeline reached\n");
1629 pipeline_ended = true;
1630 PQclear(res);
1631 if (i != 3)
1632 pg_fatal("Expected three results, got %d", i);
1633 break;
1636 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1637 if (first)
1639 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1640 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1641 i, PQresStatus(est));
1642 if (i >= 2 && est != PGRES_TUPLES_OK)
1643 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1644 i, PQresStatus(est));
1645 first = false;
1648 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1649 switch (est)
1651 case PGRES_TUPLES_OK:
1652 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1653 saw_ending_tuplesok = true;
1654 if (isSingleTuple)
1656 if (PQntuples(res) == 0)
1657 fprintf(stderr, "all tuples received in query %d\n", i);
1658 else
1659 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1661 break;
1663 case PGRES_SINGLE_TUPLE:
1664 isSingleTuple = true;
1665 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1666 break;
1668 default:
1669 pg_fatal("unexpected");
1671 PQclear(res);
1673 if (!pipeline_ended && !saw_ending_tuplesok)
1674 pg_fatal("didn't get expected terminating TUPLES_OK");
1678 * Now issue one command, get its results in with single-row mode, then
1679 * issue another command, and get its results in normal mode; make sure
1680 * the single-row mode flag is reset as expected.
1682 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1683 0, NULL, NULL, NULL, NULL, 0) != 1)
1684 pg_fatal("failed to send query: %s",
1685 PQerrorMessage(conn));
1686 if (PQsendFlushRequest(conn) != 1)
1687 pg_fatal("failed to send flush request");
1688 if (PQsetSingleRowMode(conn) != 1)
1689 pg_fatal("PQsetSingleRowMode() failed");
1690 res = PQgetResult(conn);
1691 if (res == NULL)
1692 pg_fatal("unexpected NULL");
1693 if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1694 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1695 PQresStatus(PQresultStatus(res)));
1696 res = PQgetResult(conn);
1697 if (res == NULL)
1698 pg_fatal("unexpected NULL");
1699 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1700 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1701 PQresStatus(PQresultStatus(res)));
1702 if (PQgetResult(conn) != NULL)
1703 pg_fatal("expected NULL result");
1705 if (PQsendQueryParams(conn, "SELECT 1",
1706 0, NULL, NULL, NULL, NULL, 0) != 1)
1707 pg_fatal("failed to send query: %s",
1708 PQerrorMessage(conn));
1709 if (PQsendFlushRequest(conn) != 1)
1710 pg_fatal("failed to send flush request");
1711 res = PQgetResult(conn);
1712 if (res == NULL)
1713 pg_fatal("unexpected NULL");
1714 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1715 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1716 PQresStatus(PQresultStatus(res)));
1717 if (PQgetResult(conn) != NULL)
1718 pg_fatal("expected NULL result");
1721 * Try chunked mode as well; make sure that it correctly delivers a
1722 * partial final chunk.
1724 if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1725 0, NULL, NULL, NULL, NULL, 0) != 1)
1726 pg_fatal("failed to send query: %s",
1727 PQerrorMessage(conn));
1728 if (PQsendFlushRequest(conn) != 1)
1729 pg_fatal("failed to send flush request");
1730 if (PQsetChunkedRowsMode(conn, 3) != 1)
1731 pg_fatal("PQsetChunkedRowsMode() failed");
1732 res = PQgetResult(conn);
1733 if (res == NULL)
1734 pg_fatal("unexpected NULL");
1735 if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1736 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1737 PQresStatus(PQresultStatus(res)),
1738 PQerrorMessage(conn));
1739 if (PQntuples(res) != 3)
1740 pg_fatal("Expected 3 rows, got %d", PQntuples(res));
1741 res = PQgetResult(conn);
1742 if (res == NULL)
1743 pg_fatal("unexpected NULL");
1744 if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
1745 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1746 PQresStatus(PQresultStatus(res)));
1747 if (PQntuples(res) != 2)
1748 pg_fatal("Expected 2 rows, got %d", PQntuples(res));
1749 res = PQgetResult(conn);
1750 if (res == NULL)
1751 pg_fatal("unexpected NULL");
1752 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1753 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1754 PQresStatus(PQresultStatus(res)));
1755 if (PQntuples(res) != 0)
1756 pg_fatal("Expected 0 rows, got %d", PQntuples(res));
1757 if (PQgetResult(conn) != NULL)
1758 pg_fatal("expected NULL result");
1760 if (PQexitPipelineMode(conn) != 1)
1761 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1763 fprintf(stderr, "ok\n");
1767 * Simple test to verify that a pipeline is discarded as a whole when there's
1768 * an error, ignoring transaction commands.
1770 static void
1771 test_transaction(PGconn *conn)
1773 PGresult *res;
1774 bool expect_null;
1775 int num_syncs = 0;
1777 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1778 "CREATE TABLE pq_pipeline_tst (id int)");
1779 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1780 pg_fatal("failed to create test table: %s",
1781 PQerrorMessage(conn));
1782 PQclear(res);
1784 if (PQenterPipelineMode(conn) != 1)
1785 pg_fatal("failed to enter pipeline mode: %s",
1786 PQerrorMessage(conn));
1787 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1788 pg_fatal("could not send prepare on pipeline: %s",
1789 PQerrorMessage(conn));
1791 if (PQsendQueryParams(conn,
1792 "BEGIN",
1793 0, NULL, NULL, NULL, NULL, 0) != 1)
1794 pg_fatal("failed to send query: %s",
1795 PQerrorMessage(conn));
1796 if (PQsendQueryParams(conn,
1797 "SELECT 0/0",
1798 0, NULL, NULL, NULL, NULL, 0) != 1)
1799 pg_fatal("failed to send query: %s",
1800 PQerrorMessage(conn));
1803 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1804 * get out of the pipeline-aborted state first.
1806 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1807 pg_fatal("failed to execute prepared: %s",
1808 PQerrorMessage(conn));
1810 /* This insert fails because we're in pipeline-aborted state */
1811 if (PQsendQueryParams(conn,
1812 "INSERT INTO pq_pipeline_tst VALUES (1)",
1813 0, NULL, NULL, NULL, NULL, 0) != 1)
1814 pg_fatal("failed to send query: %s",
1815 PQerrorMessage(conn));
1816 if (PQpipelineSync(conn) != 1)
1817 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1818 num_syncs++;
1821 * This insert fails even though the pipeline got a SYNC, because we're in
1822 * an aborted transaction
1824 if (PQsendQueryParams(conn,
1825 "INSERT INTO pq_pipeline_tst VALUES (2)",
1826 0, NULL, NULL, NULL, NULL, 0) != 1)
1827 pg_fatal("failed to send query: %s",
1828 PQerrorMessage(conn));
1829 if (PQpipelineSync(conn) != 1)
1830 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1831 num_syncs++;
1834 * Send ROLLBACK using prepared stmt. This one works because we just did
1835 * PQpipelineSync above.
1837 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1838 pg_fatal("failed to execute prepared: %s",
1839 PQerrorMessage(conn));
1842 * Now that we're out of a transaction and in pipeline-good mode, this
1843 * insert works
1845 if (PQsendQueryParams(conn,
1846 "INSERT INTO pq_pipeline_tst VALUES (3)",
1847 0, NULL, NULL, NULL, NULL, 0) != 1)
1848 pg_fatal("failed to send query: %s",
1849 PQerrorMessage(conn));
1850 /* Send two syncs now -- match up to SYNC messages below */
1851 if (PQpipelineSync(conn) != 1)
1852 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1853 num_syncs++;
1854 if (PQpipelineSync(conn) != 1)
1855 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1856 num_syncs++;
1858 expect_null = false;
1859 for (int i = 0;; i++)
1861 ExecStatusType restype;
1863 res = PQgetResult(conn);
1864 if (res == NULL)
1866 printf("%d: got NULL result\n", i);
1867 if (!expect_null)
1868 pg_fatal("did not expect NULL here");
1869 expect_null = false;
1870 continue;
1872 restype = PQresultStatus(res);
1873 printf("%d: got status %s", i, PQresStatus(restype));
1874 if (expect_null)
1875 pg_fatal("expected NULL");
1876 if (restype == PGRES_FATAL_ERROR)
1877 printf("; error: %s", PQerrorMessage(conn));
1878 else if (restype == PGRES_PIPELINE_ABORTED)
1880 printf(": command didn't run because pipeline aborted\n");
1882 else
1883 printf("\n");
1884 PQclear(res);
1886 if (restype == PGRES_PIPELINE_SYNC)
1887 num_syncs--;
1888 else
1889 expect_null = true;
1890 if (num_syncs <= 0)
1891 break;
1893 if (PQgetResult(conn) != NULL)
1894 pg_fatal("returned something extra after all the syncs: %s",
1895 PQresStatus(PQresultStatus(res)));
1897 if (PQexitPipelineMode(conn) != 1)
1898 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1900 /* We expect to find one tuple containing the value "3" */
1901 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1902 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1903 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1904 if (PQntuples(res) != 1)
1905 pg_fatal("did not get 1 tuple");
1906 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1907 pg_fatal("did not get expected tuple");
1908 PQclear(res);
1910 fprintf(stderr, "ok\n");
1914 * In this test mode we send a stream of queries, with one in the middle
1915 * causing an error. Verify that we can still send some more after the
1916 * error and have libpq work properly.
1918 static void
1919 test_uniqviol(PGconn *conn)
1921 int sock = PQsocket(conn);
1922 PGresult *res;
1923 Oid paramTypes[2] = {INT8OID, INT8OID};
1924 const char *paramValues[2];
1925 char paramValue0[MAXINT8LEN];
1926 char paramValue1[MAXINT8LEN];
1927 int ctr = 0;
1928 int numsent = 0;
1929 int results = 0;
1930 bool read_done = false;
1931 bool write_done = false;
1932 bool error_sent = false;
1933 bool got_error = false;
1934 int switched = 0;
1935 int socketful = 0;
1936 fd_set in_fds;
1937 fd_set out_fds;
1939 fprintf(stderr, "uniqviol ...");
1941 PQsetnonblocking(conn, 1);
1943 paramValues[0] = paramValue0;
1944 paramValues[1] = paramValue1;
1945 sprintf(paramValue1, "42");
1947 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1948 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1949 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1950 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1952 res = PQexec(conn, "begin");
1953 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1954 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1956 res = PQprepare(conn, "insertion",
1957 "insert into ppln_uniqviol values ($1, $2) returning id",
1958 2, paramTypes);
1959 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1960 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1962 if (PQenterPipelineMode(conn) != 1)
1963 pg_fatal("failed to enter pipeline mode");
1965 while (!read_done)
1968 * Avoid deadlocks by reading everything the server has sent before
1969 * sending anything. (Special precaution is needed here to process
1970 * PQisBusy before testing the socket for read-readiness, because the
1971 * socket does not turn read-ready after "sending" queries in aborted
1972 * pipeline mode.)
1974 while (PQisBusy(conn) == 0)
1976 bool new_error;
1978 if (results >= numsent)
1980 if (write_done)
1981 read_done = true;
1982 break;
1985 res = PQgetResult(conn);
1986 new_error = process_result(conn, res, results, numsent);
1987 if (new_error && got_error)
1988 pg_fatal("got two errors");
1989 got_error |= new_error;
1990 if (results++ >= numsent - 1)
1992 if (write_done)
1993 read_done = true;
1994 break;
1998 if (read_done)
1999 break;
2001 FD_ZERO(&out_fds);
2002 FD_SET(sock, &out_fds);
2004 FD_ZERO(&in_fds);
2005 FD_SET(sock, &in_fds);
2007 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2009 if (errno == EINTR)
2010 continue;
2011 pg_fatal("select() failed: %m");
2014 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
2015 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2018 * If the socket is writable and we haven't finished sending queries,
2019 * send some.
2021 if (!write_done && FD_ISSET(sock, &out_fds))
2023 for (;;)
2025 int flush;
2028 * provoke uniqueness violation exactly once after having
2029 * switched to read mode.
2031 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2033 sprintf(paramValue0, "%d", numsent / 2);
2034 fprintf(stderr, "E");
2035 error_sent = true;
2037 else
2039 fprintf(stderr, ".");
2040 sprintf(paramValue0, "%d", ctr++);
2043 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
2044 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
2045 numsent++;
2047 /* Are we done writing? */
2048 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2050 if (PQsendFlushRequest(conn) != 1)
2051 pg_fatal("failed to send flush request");
2052 write_done = true;
2053 fprintf(stderr, "\ndone writing\n");
2054 PQflush(conn);
2055 break;
2058 /* is the outgoing socket full? */
2059 flush = PQflush(conn);
2060 if (flush == -1)
2061 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
2062 if (flush == 1)
2064 if (socketful == 0)
2065 socketful = numsent;
2066 fprintf(stderr, "\nswitch to reading\n");
2067 switched++;
2068 break;
2074 if (!got_error)
2075 pg_fatal("did not get expected error");
2077 fprintf(stderr, "ok\n");
2081 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2082 * the expected NULL that should follow it.
2084 * Returns true if we read a fatal error message, otherwise false.
2086 static bool
2087 process_result(PGconn *conn, PGresult *res, int results, int numsent)
2089 PGresult *res2;
2090 bool got_error = false;
2092 if (res == NULL)
2093 pg_fatal("got unexpected NULL");
2095 switch (PQresultStatus(res))
2097 case PGRES_FATAL_ERROR:
2098 got_error = true;
2099 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2100 PQclear(res);
2102 res2 = PQgetResult(conn);
2103 if (res2 != NULL)
2104 pg_fatal("expected NULL, got %s",
2105 PQresStatus(PQresultStatus(res2)));
2106 break;
2108 case PGRES_TUPLES_OK:
2109 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2110 PQclear(res);
2112 res2 = PQgetResult(conn);
2113 if (res2 != NULL)
2114 pg_fatal("expected NULL, got %s",
2115 PQresStatus(PQresultStatus(res2)));
2116 break;
2118 case PGRES_PIPELINE_ABORTED:
2119 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2120 res2 = PQgetResult(conn);
2121 if (res2 != NULL)
2122 pg_fatal("expected NULL, got %s",
2123 PQresStatus(PQresultStatus(res2)));
2124 break;
2126 default:
2127 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2130 return got_error;
2134 static void
2135 usage(const char *progname)
2137 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2138 fprintf(stderr, "Usage:\n");
2139 fprintf(stderr, " %s [OPTION] tests\n", progname);
2140 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2141 fprintf(stderr, "\nOptions:\n");
2142 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2143 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2146 static void
2147 print_test_list(void)
2149 printf("cancel\n");
2150 printf("disallowed_in_pipeline\n");
2151 printf("multi_pipelines\n");
2152 printf("nosync\n");
2153 printf("pipeline_abort\n");
2154 printf("pipeline_idle\n");
2155 printf("pipelined_insert\n");
2156 printf("prepared\n");
2157 printf("simple_pipeline\n");
2158 printf("singlerow\n");
2159 printf("transaction\n");
2160 printf("uniqviol\n");
2164 main(int argc, char **argv)
2166 const char *conninfo = "";
2167 PGconn *conn;
2168 FILE *trace;
2169 char *testname;
2170 int numrows = 10000;
2171 PGresult *res;
2172 int c;
2174 while ((c = getopt(argc, argv, "r:t:")) != -1)
2176 switch (c)
2178 case 'r': /* numrows */
2179 errno = 0;
2180 numrows = strtol(optarg, NULL, 10);
2181 if (errno != 0 || numrows <= 0)
2183 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2184 optarg);
2185 exit(1);
2187 break;
2188 case 't': /* trace file */
2189 tracefile = pg_strdup(optarg);
2190 break;
2194 if (optind < argc)
2196 testname = pg_strdup(argv[optind]);
2197 optind++;
2199 else
2201 usage(argv[0]);
2202 exit(1);
2205 if (strcmp(testname, "tests") == 0)
2207 print_test_list();
2208 exit(0);
2211 if (optind < argc)
2213 conninfo = pg_strdup(argv[optind]);
2214 optind++;
2217 /* Make a connection to the database */
2218 conn = PQconnectdb(conninfo);
2219 if (PQstatus(conn) != CONNECTION_OK)
2221 fprintf(stderr, "Connection to database failed: %s\n",
2222 PQerrorMessage(conn));
2223 exit_nicely(conn);
2226 res = PQexec(conn, "SET lc_messages TO \"C\"");
2227 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2228 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
2229 res = PQexec(conn, "SET debug_parallel_query = off");
2230 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2231 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
2233 /* Set the trace file, if requested */
2234 if (tracefile != NULL)
2236 if (strcmp(tracefile, "-") == 0)
2237 trace = stdout;
2238 else
2239 trace = fopen(tracefile, "w");
2240 if (trace == NULL)
2241 pg_fatal("could not open file \"%s\": %m", tracefile);
2243 /* Make it line-buffered */
2244 setvbuf(trace, NULL, PG_IOLBF, 0);
2246 PQtrace(conn, trace);
2247 PQsetTraceFlags(conn,
2248 PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2251 if (strcmp(testname, "cancel") == 0)
2252 test_cancel(conn);
2253 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2254 test_disallowed_in_pipeline(conn);
2255 else if (strcmp(testname, "multi_pipelines") == 0)
2256 test_multi_pipelines(conn);
2257 else if (strcmp(testname, "nosync") == 0)
2258 test_nosync(conn);
2259 else if (strcmp(testname, "pipeline_abort") == 0)
2260 test_pipeline_abort(conn);
2261 else if (strcmp(testname, "pipeline_idle") == 0)
2262 test_pipeline_idle(conn);
2263 else if (strcmp(testname, "pipelined_insert") == 0)
2264 test_pipelined_insert(conn, numrows);
2265 else if (strcmp(testname, "prepared") == 0)
2266 test_prepared(conn);
2267 else if (strcmp(testname, "simple_pipeline") == 0)
2268 test_simple_pipeline(conn);
2269 else if (strcmp(testname, "singlerow") == 0)
2270 test_singlerowmode(conn);
2271 else if (strcmp(testname, "transaction") == 0)
2272 test_transaction(conn);
2273 else if (strcmp(testname, "uniqviol") == 0)
2274 test_uniqviol(conn);
2275 else
2277 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2278 exit(1);
2281 /* close the connection to the database and cleanup */
2282 PQfinish(conn);
2283 return 0;