1 /*-------------------------------------------------------------------------
4 * Verify libpq pipeline execution functionality
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
18 #include <sys/select.h>
21 #include "catalog/pg_type_d.h"
23 #include "pg_getopt.h"
26 static void exit_nicely(PGconn
*conn
);
27 static void pg_attribute_noreturn() pg_fatal_impl(int line
, const char *fmt
,...)
28 pg_attribute_printf(2, 3);
29 static bool process_result(PGconn
*conn
, PGresult
*res
, int results
,
32 static const char *const progname
= "libpq_pipeline";
34 /* Options and defaults */
35 static char *tracefile
= NULL
; /* path to PQtrace() file */
39 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
44 static const char *const drop_table_sql
=
45 "DROP TABLE IF EXISTS pq_pipeline_demo";
46 static const char *const create_table_sql
=
47 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
49 static const char *const insert_sql
=
50 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
51 static const char *const insert_sql2
=
52 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
54 /* max char length of an int32/64, plus sign and null terminator */
59 exit_nicely(PGconn
*conn
)
66 * The following few functions are wrapped in macros to make the reported line
67 * number in an error match the line number of the invocation.
71 * Print an error to stderr and terminate the program.
73 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
75 pg_attribute_noreturn()
76 pg_fatal_impl(int line
, const char *fmt
,...)
82 fprintf(stderr
, "\n%s:%d: ", progname
, line
);
84 vfprintf(stderr
, fmt
, args
);
86 Assert(fmt
[strlen(fmt
) - 1] != '\n');
87 fprintf(stderr
, "\n");
92 * Check that the query on the given connection got canceled.
94 #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
96 confirm_query_canceled_impl(int line
, PGconn
*conn
)
100 res
= PQgetResult(conn
);
102 pg_fatal_impl(line
, "PQgetResult returned null: %s",
103 PQerrorMessage(conn
));
104 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
105 pg_fatal_impl(line
, "query did not fail when it was expected");
106 if (strcmp(PQresultErrorField(res
, PG_DIAG_SQLSTATE
), "57014") != 0)
107 pg_fatal_impl(line
, "query failed with a different error than cancellation: %s",
108 PQerrorMessage(conn
));
111 while (PQisBusy(conn
))
112 PQconsumeInput(conn
);
116 * Using monitorConn, query pg_stat_activity to see that the connection with
117 * the given PID is either in the given state, or waiting on the given event
118 * (only one of them can be given).
121 wait_for_connection_state(int line
, PGconn
*monitorConn
, int procpid
,
122 char *state
, char *event
)
124 const Oid paramTypes
[] = {INT4OID
, TEXTOID
};
125 const char *paramValues
[2];
126 char *pidstr
= psprintf("%d", procpid
);
128 Assert((state
== NULL
) ^ (event
== NULL
));
130 paramValues
[0] = pidstr
;
131 paramValues
[1] = state
? state
: event
;
139 res
= PQexecParams(monitorConn
,
140 "SELECT count(*) FROM pg_stat_activity WHERE "
141 "pid = $1 AND state = $2",
142 2, paramTypes
, paramValues
, NULL
, NULL
, 0);
144 res
= PQexecParams(monitorConn
,
145 "SELECT count(*) FROM pg_stat_activity WHERE "
146 "pid = $1 AND wait_event = $2",
147 2, paramTypes
, paramValues
, NULL
, NULL
, 0);
149 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
150 pg_fatal_impl(line
, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn
));
151 if (PQntuples(res
) != 1)
152 pg_fatal_impl(line
, "unexpected number of rows received: %d", PQntuples(res
));
153 if (PQnfields(res
) != 1)
154 pg_fatal_impl(line
, "unexpected number of columns received: %d", PQnfields(res
));
155 value
= PQgetvalue(res
, 0, 0);
156 if (strcmp(value
, "0") != 0)
163 /* wait 10ms before polling again */
170 #define send_cancellable_query(conn, monitorConn) \
171 send_cancellable_query_impl(__LINE__, conn, monitorConn)
173 send_cancellable_query_impl(int line
, PGconn
*conn
, PGconn
*monitorConn
)
175 const char *env_wait
;
176 const Oid paramTypes
[1] = {INT4OID
};
179 * Wait for the connection to be idle, so that our check for an active
180 * connection below is reliable, instead of possibly seeing an outdated
183 wait_for_connection_state(line
, monitorConn
, PQbackendPID(conn
), "idle", NULL
);
185 env_wait
= getenv("PG_TEST_TIMEOUT_DEFAULT");
186 if (env_wait
== NULL
)
189 if (PQsendQueryParams(conn
, "SELECT pg_sleep($1)", 1, paramTypes
,
190 &env_wait
, NULL
, NULL
, 0) != 1)
191 pg_fatal_impl(line
, "failed to send query: %s", PQerrorMessage(conn
));
194 * Wait for the sleep to be active, because if the query is not running
195 * yet, the cancel request that we send won't have any effect.
197 wait_for_connection_state(line
, monitorConn
, PQbackendPID(conn
), NULL
, "PgSleep");
201 * Create a new connection with the same conninfo as the given one.
204 copy_connection(PGconn
*conn
)
207 PQconninfoOption
*opts
= PQconninfo(conn
);
208 const char **keywords
;
213 for (PQconninfoOption
*opt
= opts
; opt
->keyword
!= NULL
; ++opt
)
216 keywords
= pg_malloc(sizeof(char *) * nopts
);
217 vals
= pg_malloc(sizeof(char *) * nopts
);
219 for (PQconninfoOption
*opt
= opts
; opt
->keyword
!= NULL
; ++opt
)
223 keywords
[i
] = opt
->keyword
;
228 keywords
[i
] = vals
[i
] = NULL
;
230 copyConn
= PQconnectdbParams(keywords
, vals
, false);
232 if (PQstatus(copyConn
) != CONNECTION_OK
)
233 pg_fatal("Connection to database failed: %s",
234 PQerrorMessage(copyConn
));
240 * Test query cancellation routines
243 test_cancel(PGconn
*conn
)
246 PGcancelConn
*cancelConn
;
250 fprintf(stderr
, "test cancellations... ");
252 if (PQsetnonblocking(conn
, 1) != 0)
253 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn
));
256 * Make a separate connection to the database to monitor the query on the
259 monitorConn
= copy_connection(conn
);
260 Assert(PQstatus(monitorConn
) == CONNECTION_OK
);
263 send_cancellable_query(conn
, monitorConn
);
264 cancel
= PQgetCancel(conn
);
265 if (!PQcancel(cancel
, errorbuf
, sizeof(errorbuf
)))
266 pg_fatal("failed to run PQcancel: %s", errorbuf
);
267 confirm_query_canceled(conn
);
269 /* PGcancel object can be reused for the next query */
270 send_cancellable_query(conn
, monitorConn
);
271 if (!PQcancel(cancel
, errorbuf
, sizeof(errorbuf
)))
272 pg_fatal("failed to run PQcancel: %s", errorbuf
);
273 confirm_query_canceled(conn
);
275 PQfreeCancel(cancel
);
277 /* test PQrequestCancel */
278 send_cancellable_query(conn
, monitorConn
);
279 if (!PQrequestCancel(conn
))
280 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn
));
281 confirm_query_canceled(conn
);
283 /* test PQcancelBlocking */
284 send_cancellable_query(conn
, monitorConn
);
285 cancelConn
= PQcancelCreate(conn
);
286 if (!PQcancelBlocking(cancelConn
))
287 pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn
));
288 confirm_query_canceled(conn
);
289 PQcancelFinish(cancelConn
);
291 /* test PQcancelCreate and then polling with PQcancelPoll */
292 send_cancellable_query(conn
, monitorConn
);
293 cancelConn
= PQcancelCreate(conn
);
294 if (!PQcancelStart(cancelConn
))
295 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn
));
301 PostgresPollingStatusType pollres
= PQcancelPoll(cancelConn
);
302 int sock
= PQcancelSocket(cancelConn
);
304 if (pollres
== PGRES_POLLING_OK
)
307 FD_ZERO(&input_mask
);
308 FD_ZERO(&output_mask
);
311 case PGRES_POLLING_READING
:
312 pg_debug("polling for reads\n");
313 FD_SET(sock
, &input_mask
);
315 case PGRES_POLLING_WRITING
:
316 pg_debug("polling for writes\n");
317 FD_SET(sock
, &output_mask
);
320 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn
));
324 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn
));
331 if (select(sock
+ 1, &input_mask
, &output_mask
, NULL
, &tv
) < 0)
335 pg_fatal("select() failed: %m");
340 if (PQcancelStatus(cancelConn
) != CONNECTION_OK
)
341 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn
));
342 confirm_query_canceled(conn
);
345 * test PQcancelReset works on the cancel connection and it can be reused
348 PQcancelReset(cancelConn
);
350 send_cancellable_query(conn
, monitorConn
);
351 if (!PQcancelStart(cancelConn
))
352 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn
));
358 PostgresPollingStatusType pollres
= PQcancelPoll(cancelConn
);
359 int sock
= PQcancelSocket(cancelConn
);
361 if (pollres
== PGRES_POLLING_OK
)
364 FD_ZERO(&input_mask
);
365 FD_ZERO(&output_mask
);
368 case PGRES_POLLING_READING
:
369 pg_debug("polling for reads\n");
370 FD_SET(sock
, &input_mask
);
372 case PGRES_POLLING_WRITING
:
373 pg_debug("polling for writes\n");
374 FD_SET(sock
, &output_mask
);
377 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn
));
381 pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn
));
388 if (select(sock
+ 1, &input_mask
, &output_mask
, NULL
, &tv
) < 0)
392 pg_fatal("select() failed: %m");
397 if (PQcancelStatus(cancelConn
) != CONNECTION_OK
)
398 pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn
));
399 confirm_query_canceled(conn
);
401 PQcancelFinish(cancelConn
);
403 fprintf(stderr
, "ok\n");
407 test_disallowed_in_pipeline(PGconn
*conn
)
409 PGresult
*res
= NULL
;
411 fprintf(stderr
, "test error cases... ");
413 if (PQisnonblocking(conn
))
414 pg_fatal("Expected blocking connection mode");
416 if (PQenterPipelineMode(conn
) != 1)
417 pg_fatal("Unable to enter pipeline mode");
419 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
420 pg_fatal("Pipeline mode not activated properly");
422 /* PQexec should fail in pipeline mode */
423 res
= PQexec(conn
, "SELECT 1");
424 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
425 pg_fatal("PQexec should fail in pipeline mode but succeeded");
426 if (strcmp(PQerrorMessage(conn
),
427 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
428 pg_fatal("did not get expected error message; got: \"%s\"",
429 PQerrorMessage(conn
));
431 /* PQsendQuery should fail in pipeline mode */
432 if (PQsendQuery(conn
, "SELECT 1") != 0)
433 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
434 if (strcmp(PQerrorMessage(conn
),
435 "PQsendQuery not allowed in pipeline mode\n") != 0)
436 pg_fatal("did not get expected error message; got: \"%s\"",
437 PQerrorMessage(conn
));
439 /* Entering pipeline mode when already in pipeline mode is OK */
440 if (PQenterPipelineMode(conn
) != 1)
441 pg_fatal("re-entering pipeline mode should be a no-op but failed");
443 if (PQisBusy(conn
) != 0)
444 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
446 /* ok, back to normal command mode */
447 if (PQexitPipelineMode(conn
) != 1)
448 pg_fatal("couldn't exit idle empty pipeline mode");
450 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
451 pg_fatal("Pipeline mode not terminated properly");
453 /* exiting pipeline mode when not in pipeline mode should be a no-op */
454 if (PQexitPipelineMode(conn
) != 1)
455 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
457 /* can now PQexec again */
458 res
= PQexec(conn
, "SELECT 1");
459 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
460 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
461 PQerrorMessage(conn
));
463 fprintf(stderr
, "ok\n");
467 test_multi_pipelines(PGconn
*conn
)
469 PGresult
*res
= NULL
;
470 const char *dummy_params
[1] = {"1"};
471 Oid dummy_param_oids
[1] = {INT4OID
};
473 fprintf(stderr
, "multi pipeline... ");
476 * Queue up a couple of small pipelines and process each without returning
477 * to command mode first.
479 if (PQenterPipelineMode(conn
) != 1)
480 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
483 if (PQsendQueryParams(conn
, "SELECT $1", 1, dummy_param_oids
,
484 dummy_params
, NULL
, NULL
, 0) != 1)
485 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn
));
487 if (PQpipelineSync(conn
) != 1)
488 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn
));
490 /* second pipeline */
491 if (PQsendQueryParams(conn
, "SELECT $1", 1, dummy_param_oids
,
492 dummy_params
, NULL
, NULL
, 0) != 1)
493 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn
));
495 /* Skip flushing once. */
496 if (PQsendPipelineSync(conn
) != 1)
497 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn
));
500 if (PQsendQueryParams(conn
, "SELECT $1", 1, dummy_param_oids
,
501 dummy_params
, NULL
, NULL
, 0) != 1)
502 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn
));
504 if (PQpipelineSync(conn
) != 1)
505 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
507 /* OK, start processing the results */
511 res
= PQgetResult(conn
);
513 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
514 PQerrorMessage(conn
));
516 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
517 pg_fatal("Unexpected result code %s from first pipeline item",
518 PQresStatus(PQresultStatus(res
)));
522 if (PQgetResult(conn
) != NULL
)
523 pg_fatal("PQgetResult returned something extra after first result");
525 if (PQexitPipelineMode(conn
) != 0)
526 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
528 res
= PQgetResult(conn
);
530 pg_fatal("PQgetResult returned null when sync result expected: %s",
531 PQerrorMessage(conn
));
533 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
534 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
535 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
538 /* second pipeline */
540 res
= PQgetResult(conn
);
542 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
543 PQerrorMessage(conn
));
545 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
546 pg_fatal("Unexpected result code %s from second pipeline item",
547 PQresStatus(PQresultStatus(res
)));
551 if (PQgetResult(conn
) != NULL
)
552 pg_fatal("PQgetResult returned something extra after first result");
554 if (PQexitPipelineMode(conn
) != 0)
555 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
557 res
= PQgetResult(conn
);
559 pg_fatal("PQgetResult returned null when sync result expected: %s",
560 PQerrorMessage(conn
));
562 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
563 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
564 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
569 res
= PQgetResult(conn
);
571 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
572 PQerrorMessage(conn
));
574 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
575 pg_fatal("Unexpected result code %s from third pipeline item",
576 PQresStatus(PQresultStatus(res
)));
578 res
= PQgetResult(conn
);
580 pg_fatal("Expected null result, got %s",
581 PQresStatus(PQresultStatus(res
)));
583 res
= PQgetResult(conn
);
585 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
586 PQerrorMessage(conn
));
588 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
589 pg_fatal("Unexpected result code %s from second pipeline sync",
590 PQresStatus(PQresultStatus(res
)));
592 /* We're still in pipeline mode ... */
593 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
594 pg_fatal("Fell out of pipeline mode somehow");
596 /* until we end it, which we can safely do now */
597 if (PQexitPipelineMode(conn
) != 1)
598 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
599 PQerrorMessage(conn
));
601 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
602 pg_fatal("exiting pipeline mode didn't seem to work");
604 fprintf(stderr
, "ok\n");
608 * Test behavior when a pipeline dispatches a number of commands that are
609 * not flushed by a sync point.
612 test_nosync(PGconn
*conn
)
616 int sock
= PQsocket(conn
);
618 fprintf(stderr
, "nosync... ");
621 pg_fatal("invalid socket");
623 if (PQenterPipelineMode(conn
) != 1)
624 pg_fatal("could not enter pipeline mode");
625 for (int i
= 0; i
< numqueries
; i
++)
630 if (PQsendQueryParams(conn
, "SELECT repeat('xyzxz', 12)",
631 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
632 pg_fatal("error sending select: %s", PQerrorMessage(conn
));
636 * If the server has written anything to us, read (some of) it now.
638 FD_ZERO(&input_mask
);
639 FD_SET(sock
, &input_mask
);
642 if (select(sock
+ 1, &input_mask
, NULL
, NULL
, &tv
) < 0)
644 fprintf(stderr
, "select() failed: %m\n");
647 if (FD_ISSET(sock
, &input_mask
) && PQconsumeInput(conn
) != 1)
648 pg_fatal("failed to read from server: %s", PQerrorMessage(conn
));
651 /* tell server to flush its output buffer */
652 if (PQsendFlushRequest(conn
) != 1)
653 pg_fatal("failed to send flush request");
656 /* Now read all results */
661 res
= PQgetResult(conn
);
663 /* NULL results are only expected after TUPLES_OK */
665 pg_fatal("got unexpected NULL result after %d results", results
);
667 /* We expect exactly one TUPLES_OK result for each query we sent */
668 if (PQresultStatus(res
) == PGRES_TUPLES_OK
)
672 /* and one NULL result should follow each */
673 res2
= PQgetResult(conn
);
675 pg_fatal("expected NULL, got %s",
676 PQresStatus(PQresultStatus(res2
)));
680 /* if we're done, we're done */
681 if (results
== numqueries
)
687 /* anything else is unexpected */
688 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res
)));
691 fprintf(stderr
, "ok\n");
695 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
696 * still have to get results for each pipeline item, but the item will just be
697 * a PGRES_PIPELINE_ABORTED code.
699 * This intentionally doesn't use a transaction to wrap the pipeline. You should
700 * usually use an xact, but in this case we want to observe the effects of each
704 test_pipeline_abort(PGconn
*conn
)
706 PGresult
*res
= NULL
;
707 const char *dummy_params
[1] = {"1"};
708 Oid dummy_param_oids
[1] = {INT4OID
};
713 fprintf(stderr
, "aborted pipeline... ");
715 res
= PQexec(conn
, drop_table_sql
);
716 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
717 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn
));
719 res
= PQexec(conn
, create_table_sql
);
720 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
721 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn
));
724 * Queue up a couple of small pipelines and process each without returning
725 * to command mode first. Make sure the second operation in the first
728 if (PQenterPipelineMode(conn
) != 1)
729 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
731 dummy_params
[0] = "1";
732 if (PQsendQueryParams(conn
, insert_sql
, 1, dummy_param_oids
,
733 dummy_params
, NULL
, NULL
, 0) != 1)
734 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn
));
736 if (PQsendQueryParams(conn
, "SELECT no_such_function($1)",
737 1, dummy_param_oids
, dummy_params
,
739 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn
));
741 dummy_params
[0] = "2";
742 if (PQsendQueryParams(conn
, insert_sql
, 1, dummy_param_oids
,
743 dummy_params
, NULL
, NULL
, 0) != 1)
744 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn
));
746 if (PQpipelineSync(conn
) != 1)
747 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
749 dummy_params
[0] = "3";
750 if (PQsendQueryParams(conn
, insert_sql
, 1, dummy_param_oids
,
751 dummy_params
, NULL
, NULL
, 0) != 1)
752 pg_fatal("dispatching second-pipeline insert failed: %s",
753 PQerrorMessage(conn
));
755 if (PQpipelineSync(conn
) != 1)
756 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
759 * OK, start processing the pipeline results.
761 * We should get a command-ok for the first query, then a fatal error and
762 * a pipeline aborted message for the second insert, a pipeline-end, then
763 * a command-ok and a pipeline-ok for the second pipeline operation.
765 res
= PQgetResult(conn
);
767 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
768 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
769 pg_fatal("Unexpected result status %s: %s",
770 PQresStatus(PQresultStatus(res
)),
771 PQresultErrorMessage(res
));
774 /* NULL result to signal end-of-results for this command */
775 if ((res
= PQgetResult(conn
)) != NULL
)
776 pg_fatal("Expected null result, got %s",
777 PQresStatus(PQresultStatus(res
)));
779 /* Second query caused error, so we expect an error next */
780 res
= PQgetResult(conn
);
782 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
783 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
784 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
785 PQresStatus(PQresultStatus(res
)));
788 /* NULL result to signal end-of-results for this command */
789 if ((res
= PQgetResult(conn
)) != NULL
)
790 pg_fatal("Expected null result, got %s",
791 PQresStatus(PQresultStatus(res
)));
794 * pipeline should now be aborted.
796 * Note that we could still queue more queries at this point if we wanted;
797 * they'd get added to a new third pipeline since we've already sent a
798 * second. The aborted flag relates only to the pipeline being received.
800 if (PQpipelineStatus(conn
) != PQ_PIPELINE_ABORTED
)
801 pg_fatal("pipeline should be flagged as aborted but isn't");
803 /* third query in pipeline, the second insert */
804 res
= PQgetResult(conn
);
806 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
807 if (PQresultStatus(res
) != PGRES_PIPELINE_ABORTED
)
808 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
809 PQresStatus(PQresultStatus(res
)));
812 /* NULL result to signal end-of-results for this command */
813 if ((res
= PQgetResult(conn
)) != NULL
)
814 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res
)));
816 if (PQpipelineStatus(conn
) != PQ_PIPELINE_ABORTED
)
817 pg_fatal("pipeline should be flagged as aborted but isn't");
819 /* Ensure we're still in pipeline */
820 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
821 pg_fatal("Fell out of pipeline mode somehow");
824 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
826 * (This is so clients know to start processing results normally again and
827 * can tell the difference between skipped commands and the sync.)
829 res
= PQgetResult(conn
);
831 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
832 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
833 pg_fatal("Unexpected result code from first pipeline sync\n"
834 "Expected PGRES_PIPELINE_SYNC, got %s",
835 PQresStatus(PQresultStatus(res
)));
838 if (PQpipelineStatus(conn
) == PQ_PIPELINE_ABORTED
)
839 pg_fatal("sync should've cleared the aborted flag but didn't");
841 /* We're still in pipeline mode... */
842 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
843 pg_fatal("Fell out of pipeline mode somehow");
845 /* the insert from the second pipeline */
846 res
= PQgetResult(conn
);
848 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
849 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
850 pg_fatal("Unexpected result code %s from first item in second pipeline",
851 PQresStatus(PQresultStatus(res
)));
854 /* Read the NULL result at the end of the command */
855 if ((res
= PQgetResult(conn
)) != NULL
)
856 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res
)));
858 /* the second pipeline sync */
859 if ((res
= PQgetResult(conn
)) == NULL
)
860 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
861 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
862 pg_fatal("Unexpected result code %s from second pipeline sync",
863 PQresStatus(PQresultStatus(res
)));
866 if ((res
= PQgetResult(conn
)) != NULL
)
867 pg_fatal("Expected null result, got %s: %s",
868 PQresStatus(PQresultStatus(res
)),
869 PQerrorMessage(conn
));
871 /* Try to send two queries in one command */
872 if (PQsendQueryParams(conn
, "SELECT 1; SELECT 2", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
873 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
874 if (PQpipelineSync(conn
) != 1)
875 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
877 while ((res
= PQgetResult(conn
)) != NULL
)
879 switch (PQresultStatus(res
))
881 case PGRES_FATAL_ERROR
:
882 if (strcmp(PQresultErrorField(res
, PG_DIAG_SQLSTATE
), "42601") != 0)
883 pg_fatal("expected error about multiple commands, got %s",
884 PQerrorMessage(conn
));
885 printf("got expected %s", PQerrorMessage(conn
));
889 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res
)));
894 pg_fatal("did not get cannot-insert-multiple-commands error");
895 res
= PQgetResult(conn
);
897 pg_fatal("got NULL result");
898 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
899 pg_fatal("Unexpected result code %s from pipeline sync",
900 PQresStatus(PQresultStatus(res
)));
901 fprintf(stderr
, "ok\n");
903 /* Test single-row mode with an error partways */
904 if (PQsendQueryParams(conn
, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
905 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
906 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
907 if (PQpipelineSync(conn
) != 1)
908 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
909 PQsetSingleRowMode(conn
);
912 while ((res
= PQgetResult(conn
)) != NULL
)
914 switch (PQresultStatus(res
))
916 case PGRES_SINGLE_TUPLE
:
917 printf("got row: %s\n", PQgetvalue(res
, 0, 0));
920 case PGRES_FATAL_ERROR
:
921 if (strcmp(PQresultErrorField(res
, PG_DIAG_SQLSTATE
), "22012") != 0)
922 pg_fatal("expected division-by-zero, got: %s (%s)",
923 PQerrorMessage(conn
),
924 PQresultErrorField(res
, PG_DIAG_SQLSTATE
));
925 printf("got expected division-by-zero\n");
929 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res
)));
934 pg_fatal("did not get division-by-zero error");
936 pg_fatal("did not get three rows");
937 /* the third pipeline sync */
938 if ((res
= PQgetResult(conn
)) == NULL
)
939 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
940 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
941 pg_fatal("Unexpected result code %s from third pipeline sync",
942 PQresStatus(PQresultStatus(res
)));
945 /* We're still in pipeline mode... */
946 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
947 pg_fatal("Fell out of pipeline mode somehow");
949 /* until we end it, which we can safely do now */
950 if (PQexitPipelineMode(conn
) != 1)
951 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
952 PQerrorMessage(conn
));
954 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
955 pg_fatal("exiting pipeline mode didn't seem to work");
958 * Since we fired the pipelines off without a surrounding xact, the results
961 * - Implicit xact started by server around 1st pipeline
962 * - First insert applied
963 * - Second statement aborted xact
964 * - Third insert skipped
965 * - Sync rolled back first implicit xact
966 * - Implicit xact created by server around 2nd pipeline
967 * - insert applied from 2nd pipeline
968 * - Sync commits 2nd xact
970 * So we should only have the value 3 that we inserted.
972 res
= PQexec(conn
, "SELECT itemno FROM pq_pipeline_demo");
974 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
975 pg_fatal("Expected tuples, got %s: %s",
976 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
977 if (PQntuples(res
) != 1)
978 pg_fatal("expected 1 result, got %d", PQntuples(res
));
979 for (i
= 0; i
< PQntuples(res
); i
++)
981 const char *val
= PQgetvalue(res
, i
, 0);
983 if (strcmp(val
, "3") != 0)
984 pg_fatal("expected only insert with value 3, got %s", val
);
989 fprintf(stderr
, "ok\n");
992 /* State machine enum for test_pipelined_insert */
993 enum PipelineInsertStep
1006 test_pipelined_insert(PGconn
*conn
, int n_rows
)
1008 Oid insert_param_oids
[2] = {INT4OID
, INT8OID
};
1009 const char *insert_params
[2];
1010 char insert_param_0
[MAXINTLEN
];
1011 char insert_param_1
[MAXINT8LEN
];
1012 enum PipelineInsertStep send_step
= BI_BEGIN_TX
,
1013 recv_step
= BI_BEGIN_TX
;
1017 insert_params
[0] = insert_param_0
;
1018 insert_params
[1] = insert_param_1
;
1020 rows_to_send
= rows_to_receive
= n_rows
;
1023 * Do a pipelined insert into a table created at the start of the pipeline
1025 if (PQenterPipelineMode(conn
) != 1)
1026 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1028 while (send_step
!= BI_PREPARE
)
1035 sql
= "BEGIN TRANSACTION";
1036 send_step
= BI_DROP_TABLE
;
1040 sql
= drop_table_sql
;
1041 send_step
= BI_CREATE_TABLE
;
1044 case BI_CREATE_TABLE
:
1045 sql
= create_table_sql
;
1046 send_step
= BI_PREPARE
;
1050 pg_fatal("invalid state");
1051 sql
= NULL
; /* keep compiler quiet */
1054 pg_debug("sending: %s\n", sql
);
1055 if (PQsendQueryParams(conn
, sql
,
1056 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1057 pg_fatal("dispatching %s failed: %s", sql
, PQerrorMessage(conn
));
1060 Assert(send_step
== BI_PREPARE
);
1061 pg_debug("sending: %s\n", insert_sql2
);
1062 if (PQsendPrepare(conn
, "my_insert", insert_sql2
, 2, insert_param_oids
) != 1)
1063 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn
));
1064 send_step
= BI_INSERT_ROWS
;
1067 * Now we start inserting. We'll be sending enough data that we could fill
1068 * our output buffer, so to avoid deadlocking we need to enter nonblocking
1069 * mode and consume input while we send more output. As results of each
1070 * query are processed we should pop them to allow processing of the next
1071 * query. There's no need to finish the pipeline before processing
1074 if (PQsetnonblocking(conn
, 1) != 0)
1075 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn
));
1077 while (recv_step
!= BI_DONE
)
1083 sock
= PQsocket(conn
);
1086 break; /* shouldn't happen */
1088 FD_ZERO(&input_mask
);
1089 FD_SET(sock
, &input_mask
);
1090 FD_ZERO(&output_mask
);
1091 FD_SET(sock
, &output_mask
);
1093 if (select(sock
+ 1, &input_mask
, &output_mask
, NULL
, NULL
) < 0)
1095 fprintf(stderr
, "select() failed: %m\n");
1100 * Process any results, so we keep the server's output buffer free
1101 * flowing and it can continue to process input
1103 if (FD_ISSET(sock
, &input_mask
))
1105 PQconsumeInput(conn
);
1107 /* Read until we'd block if we tried to read */
1108 while (!PQisBusy(conn
) && recv_step
< BI_DONE
)
1111 const char *cmdtag
= "";
1112 const char *description
= "";
1116 * Read next result. If no more results from this query,
1117 * advance to the next query
1119 res
= PQgetResult(conn
);
1123 status
= PGRES_COMMAND_OK
;
1131 cmdtag
= "DROP TABLE";
1134 case BI_CREATE_TABLE
:
1135 cmdtag
= "CREATE TABLE";
1140 description
= "PREPARE";
1143 case BI_INSERT_ROWS
:
1146 if (rows_to_receive
== 0)
1155 description
= "SYNC";
1156 status
= PGRES_PIPELINE_SYNC
;
1161 pg_fatal("unreachable state");
1164 if (PQresultStatus(res
) != status
)
1165 pg_fatal("%s reported status %s, expected %s\n"
1166 "Error message: \"%s\"",
1167 description
, PQresStatus(PQresultStatus(res
)),
1168 PQresStatus(status
), PQerrorMessage(conn
));
1170 if (strncmp(PQcmdStatus(res
), cmdtag
, strlen(cmdtag
)) != 0)
1171 pg_fatal("%s expected command tag '%s', got '%s'",
1172 description
, cmdtag
, PQcmdStatus(res
));
1174 pg_debug("Got %s OK\n", cmdtag
[0] != '\0' ? cmdtag
: description
);
1180 /* Write more rows and/or the end pipeline message, if needed */
1181 if (FD_ISSET(sock
, &output_mask
))
1185 if (send_step
== BI_INSERT_ROWS
)
1187 snprintf(insert_param_0
, MAXINTLEN
, "%d", rows_to_send
);
1188 /* use up some buffer space with a wide value */
1189 snprintf(insert_param_1
, MAXINT8LEN
, "%lld", 1LL << 62);
1191 if (PQsendQueryPrepared(conn
, "my_insert",
1192 2, insert_params
, NULL
, NULL
, 0) == 1)
1194 pg_debug("sent row %d\n", rows_to_send
);
1197 if (rows_to_send
== 0)
1203 * in nonblocking mode, so it's OK for an insert to fail
1206 fprintf(stderr
, "WARNING: failed to send insert #%d: %s\n",
1207 rows_to_send
, PQerrorMessage(conn
));
1210 else if (send_step
== BI_COMMIT_TX
)
1212 if (PQsendQueryParams(conn
, "COMMIT",
1213 0, NULL
, NULL
, NULL
, NULL
, 0) == 1)
1215 pg_debug("sent COMMIT\n");
1220 fprintf(stderr
, "WARNING: failed to send commit: %s\n",
1221 PQerrorMessage(conn
));
1224 else if (send_step
== BI_SYNC
)
1226 if (PQpipelineSync(conn
) == 1)
1228 fprintf(stdout
, "pipeline sync sent\n");
1233 fprintf(stderr
, "WARNING: pipeline sync failed: %s\n",
1234 PQerrorMessage(conn
));
1240 /* We've got the sync message and the pipeline should be done */
1241 if (PQexitPipelineMode(conn
) != 1)
1242 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1243 PQerrorMessage(conn
));
1245 if (PQsetnonblocking(conn
, 0) != 0)
1246 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn
));
1248 fprintf(stderr
, "ok\n");
1252 test_prepared(PGconn
*conn
)
1254 PGresult
*res
= NULL
;
1255 Oid param_oids
[1] = {INT4OID
};
1256 Oid expected_oids
[4];
1259 fprintf(stderr
, "prepared... ");
1261 if (PQenterPipelineMode(conn
) != 1)
1262 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1263 if (PQsendPrepare(conn
, "select_one", "SELECT $1, '42', $1::numeric, "
1265 1, param_oids
) != 1)
1266 pg_fatal("preparing query failed: %s", PQerrorMessage(conn
));
1267 expected_oids
[0] = INT4OID
;
1268 expected_oids
[1] = TEXTOID
;
1269 expected_oids
[2] = NUMERICOID
;
1270 expected_oids
[3] = INTERVALOID
;
1271 if (PQsendDescribePrepared(conn
, "select_one") != 1)
1272 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn
));
1273 if (PQpipelineSync(conn
) != 1)
1274 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1276 res
= PQgetResult(conn
);
1278 pg_fatal("PQgetResult returned null");
1279 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1280 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1282 res
= PQgetResult(conn
);
1284 pg_fatal("expected NULL result");
1286 res
= PQgetResult(conn
);
1288 pg_fatal("PQgetResult returned NULL");
1289 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1290 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1291 if (PQnfields(res
) != lengthof(expected_oids
))
1292 pg_fatal("expected %zu columns, got %d",
1293 lengthof(expected_oids
), PQnfields(res
));
1294 for (int i
= 0; i
< PQnfields(res
); i
++)
1296 typ
= PQftype(res
, i
);
1297 if (typ
!= expected_oids
[i
])
1298 pg_fatal("field %d: expected type %u, got %u",
1299 i
, expected_oids
[i
], typ
);
1302 res
= PQgetResult(conn
);
1304 pg_fatal("expected NULL result");
1306 res
= PQgetResult(conn
);
1307 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1308 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1310 fprintf(stderr
, "closing statement..");
1311 if (PQsendClosePrepared(conn
, "select_one") != 1)
1312 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn
));
1313 if (PQpipelineSync(conn
) != 1)
1314 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1316 res
= PQgetResult(conn
);
1318 pg_fatal("expected non-NULL result");
1319 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1320 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1322 res
= PQgetResult(conn
);
1324 pg_fatal("expected NULL result");
1325 res
= PQgetResult(conn
);
1326 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1327 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1329 if (PQexitPipelineMode(conn
) != 1)
1330 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn
));
1332 /* Now that it's closed we should get an error when describing */
1333 res
= PQdescribePrepared(conn
, "select_one");
1334 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
1335 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res
)));
1338 * Also test the blocking close, this should not fail since closing a
1339 * non-existent prepared statement is a no-op
1341 res
= PQclosePrepared(conn
, "select_one");
1342 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1343 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1345 fprintf(stderr
, "creating portal... ");
1346 PQexec(conn
, "BEGIN");
1347 PQexec(conn
, "DECLARE cursor_one CURSOR FOR SELECT 1");
1348 PQenterPipelineMode(conn
);
1349 if (PQsendDescribePortal(conn
, "cursor_one") != 1)
1350 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn
));
1351 if (PQpipelineSync(conn
) != 1)
1352 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1353 res
= PQgetResult(conn
);
1355 pg_fatal("PQgetResult returned null");
1356 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1357 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1359 typ
= PQftype(res
, 0);
1361 pg_fatal("portal: expected type %u, got %u",
1364 res
= PQgetResult(conn
);
1366 pg_fatal("expected NULL result");
1367 res
= PQgetResult(conn
);
1368 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1369 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1371 fprintf(stderr
, "closing portal... ");
1372 if (PQsendClosePortal(conn
, "cursor_one") != 1)
1373 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn
));
1374 if (PQpipelineSync(conn
) != 1)
1375 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1377 res
= PQgetResult(conn
);
1379 pg_fatal("expected non-NULL result");
1380 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1381 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1383 res
= PQgetResult(conn
);
1385 pg_fatal("expected NULL result");
1386 res
= PQgetResult(conn
);
1387 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1388 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1390 if (PQexitPipelineMode(conn
) != 1)
1391 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn
));
1393 /* Now that it's closed we should get an error when describing */
1394 res
= PQdescribePortal(conn
, "cursor_one");
1395 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
1396 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res
)));
1399 * Also test the blocking close, this should not fail since closing a
1400 * non-existent portal is a no-op
1402 res
= PQclosePortal(conn
, "cursor_one");
1403 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1404 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1406 fprintf(stderr
, "ok\n");
1409 /* Notice processor: print notices, and count how many we got */
1411 notice_processor(void *arg
, const char *message
)
1413 int *n_notices
= (int *) arg
;
1416 fprintf(stderr
, "NOTICE %d: %s", *n_notices
, message
);
1419 /* Verify behavior in "idle" state */
1421 test_pipeline_idle(PGconn
*conn
)
1426 fprintf(stderr
, "\npipeline idle...\n");
1428 PQsetNoticeProcessor(conn
, notice_processor
, &n_notices
);
1430 /* Try to exit pipeline mode in pipeline-idle state */
1431 if (PQenterPipelineMode(conn
) != 1)
1432 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1433 if (PQsendQueryParams(conn
, "SELECT 1", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1434 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
1435 PQsendFlushRequest(conn
);
1436 res
= PQgetResult(conn
);
1438 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1439 PQerrorMessage(conn
));
1440 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1441 pg_fatal("unexpected result code %s from first pipeline item",
1442 PQresStatus(PQresultStatus(res
)));
1444 res
= PQgetResult(conn
);
1446 pg_fatal("did not receive terminating NULL");
1447 if (PQsendQueryParams(conn
, "SELECT 2", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1448 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
1449 if (PQexitPipelineMode(conn
) == 1)
1450 pg_fatal("exiting pipeline succeeded when it shouldn't");
1451 if (strncmp(PQerrorMessage(conn
), "cannot exit pipeline mode",
1452 strlen("cannot exit pipeline mode")) != 0)
1453 pg_fatal("did not get expected error; got: %s",
1454 PQerrorMessage(conn
));
1455 PQsendFlushRequest(conn
);
1456 res
= PQgetResult(conn
);
1457 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1458 pg_fatal("unexpected result code %s from second pipeline item",
1459 PQresStatus(PQresultStatus(res
)));
1461 res
= PQgetResult(conn
);
1463 pg_fatal("did not receive terminating NULL");
1464 if (PQexitPipelineMode(conn
) != 1)
1465 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn
));
1468 pg_fatal("got %d notice(s)", n_notices
);
1469 fprintf(stderr
, "ok - 1\n");
1471 /* Have a WARNING in the middle of a resultset */
1472 if (PQenterPipelineMode(conn
) != 1)
1473 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn
));
1474 if (PQsendQueryParams(conn
, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1475 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
1476 PQsendFlushRequest(conn
);
1477 res
= PQgetResult(conn
);
1479 pg_fatal("unexpected NULL result received");
1480 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1481 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res
)));
1482 if (PQexitPipelineMode(conn
) != 1)
1483 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn
));
1484 fprintf(stderr
, "ok - 2\n");
1488 test_simple_pipeline(PGconn
*conn
)
1490 PGresult
*res
= NULL
;
1491 const char *dummy_params
[1] = {"1"};
1492 Oid dummy_param_oids
[1] = {INT4OID
};
1494 fprintf(stderr
, "simple pipeline... ");
1497 * Enter pipeline mode and dispatch a set of operations, which we'll then
1498 * process the results of as they come in.
1500 * For a simple case we should be able to do this without interim
1501 * processing of results since our output buffer will give us enough slush
1502 * to work with and we won't block on sending. So blocking mode is fine.
1504 if (PQisnonblocking(conn
))
1505 pg_fatal("Expected blocking connection mode");
1507 if (PQenterPipelineMode(conn
) != 1)
1508 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1510 if (PQsendQueryParams(conn
, "SELECT $1",
1511 1, dummy_param_oids
, dummy_params
,
1512 NULL
, NULL
, 0) != 1)
1513 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn
));
1515 if (PQexitPipelineMode(conn
) != 0)
1516 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1518 if (PQpipelineSync(conn
) != 1)
1519 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1521 res
= PQgetResult(conn
);
1523 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1524 PQerrorMessage(conn
));
1526 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1527 pg_fatal("Unexpected result code %s from first pipeline item",
1528 PQresStatus(PQresultStatus(res
)));
1533 if (PQgetResult(conn
) != NULL
)
1534 pg_fatal("PQgetResult returned something extra after first query result.");
1537 * Even though we've processed the result there's still a sync to come and
1538 * we can't exit pipeline mode yet
1540 if (PQexitPipelineMode(conn
) != 0)
1541 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1543 res
= PQgetResult(conn
);
1545 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1546 PQerrorMessage(conn
));
1548 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1549 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1550 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
1555 if (PQgetResult(conn
) != NULL
)
1556 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1557 PQresStatus(PQresultStatus(res
)));
1559 /* We're still in pipeline mode... */
1560 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
1561 pg_fatal("Fell out of pipeline mode somehow");
1563 /* ... until we end it, which we can safely do now */
1564 if (PQexitPipelineMode(conn
) != 1)
1565 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1566 PQerrorMessage(conn
));
1568 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
1569 pg_fatal("Exiting pipeline mode didn't seem to work");
1571 fprintf(stderr
, "ok\n");
1575 test_singlerowmode(PGconn
*conn
)
1579 bool pipeline_ended
= false;
1581 if (PQenterPipelineMode(conn
) != 1)
1582 pg_fatal("failed to enter pipeline mode: %s",
1583 PQerrorMessage(conn
));
1585 /* One series of three commands, using single-row mode for the first two. */
1586 for (i
= 0; i
< 3; i
++)
1590 param
[0] = psprintf("%d", 44 + i
);
1592 if (PQsendQueryParams(conn
,
1593 "SELECT generate_series(42, $1)",
1596 (const char **) param
,
1600 pg_fatal("failed to send query: %s",
1601 PQerrorMessage(conn
));
1604 if (PQpipelineSync(conn
) != 1)
1605 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1607 for (i
= 0; !pipeline_ended
; i
++)
1610 bool saw_ending_tuplesok
;
1611 bool isSingleTuple
= false;
1613 /* Set single row mode for only first 2 SELECT queries */
1616 if (PQsetSingleRowMode(conn
) != 1)
1617 pg_fatal("PQsetSingleRowMode() failed for i=%d", i
);
1620 /* Consume rows for this query */
1621 saw_ending_tuplesok
= false;
1622 while ((res
= PQgetResult(conn
)) != NULL
)
1624 ExecStatusType est
= PQresultStatus(res
);
1626 if (est
== PGRES_PIPELINE_SYNC
)
1628 fprintf(stderr
, "end of pipeline reached\n");
1629 pipeline_ended
= true;
1632 pg_fatal("Expected three results, got %d", i
);
1636 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1639 if (i
<= 1 && est
!= PGRES_SINGLE_TUPLE
)
1640 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1641 i
, PQresStatus(est
));
1642 if (i
>= 2 && est
!= PGRES_TUPLES_OK
)
1643 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1644 i
, PQresStatus(est
));
1648 fprintf(stderr
, "Result status %s for query %d", PQresStatus(est
), i
);
1651 case PGRES_TUPLES_OK
:
1652 fprintf(stderr
, ", tuples: %d\n", PQntuples(res
));
1653 saw_ending_tuplesok
= true;
1656 if (PQntuples(res
) == 0)
1657 fprintf(stderr
, "all tuples received in query %d\n", i
);
1659 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1663 case PGRES_SINGLE_TUPLE
:
1664 isSingleTuple
= true;
1665 fprintf(stderr
, ", %d tuple: %s\n", PQntuples(res
), PQgetvalue(res
, 0, 0));
1669 pg_fatal("unexpected");
1673 if (!pipeline_ended
&& !saw_ending_tuplesok
)
1674 pg_fatal("didn't get expected terminating TUPLES_OK");
1678 * Now issue one command, get its results in with single-row mode, then
1679 * issue another command, and get its results in normal mode; make sure
1680 * the single-row mode flag is reset as expected.
1682 if (PQsendQueryParams(conn
, "SELECT generate_series(0, 0)",
1683 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1684 pg_fatal("failed to send query: %s",
1685 PQerrorMessage(conn
));
1686 if (PQsendFlushRequest(conn
) != 1)
1687 pg_fatal("failed to send flush request");
1688 if (PQsetSingleRowMode(conn
) != 1)
1689 pg_fatal("PQsetSingleRowMode() failed");
1690 res
= PQgetResult(conn
);
1692 pg_fatal("unexpected NULL");
1693 if (PQresultStatus(res
) != PGRES_SINGLE_TUPLE
)
1694 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1695 PQresStatus(PQresultStatus(res
)));
1696 res
= PQgetResult(conn
);
1698 pg_fatal("unexpected NULL");
1699 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1700 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1701 PQresStatus(PQresultStatus(res
)));
1702 if (PQgetResult(conn
) != NULL
)
1703 pg_fatal("expected NULL result");
1705 if (PQsendQueryParams(conn
, "SELECT 1",
1706 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1707 pg_fatal("failed to send query: %s",
1708 PQerrorMessage(conn
));
1709 if (PQsendFlushRequest(conn
) != 1)
1710 pg_fatal("failed to send flush request");
1711 res
= PQgetResult(conn
);
1713 pg_fatal("unexpected NULL");
1714 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1715 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1716 PQresStatus(PQresultStatus(res
)));
1717 if (PQgetResult(conn
) != NULL
)
1718 pg_fatal("expected NULL result");
1721 * Try chunked mode as well; make sure that it correctly delivers a
1722 * partial final chunk.
1724 if (PQsendQueryParams(conn
, "SELECT generate_series(1, 5)",
1725 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1726 pg_fatal("failed to send query: %s",
1727 PQerrorMessage(conn
));
1728 if (PQsendFlushRequest(conn
) != 1)
1729 pg_fatal("failed to send flush request");
1730 if (PQsetChunkedRowsMode(conn
, 3) != 1)
1731 pg_fatal("PQsetChunkedRowsMode() failed");
1732 res
= PQgetResult(conn
);
1734 pg_fatal("unexpected NULL");
1735 if (PQresultStatus(res
) != PGRES_TUPLES_CHUNK
)
1736 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1737 PQresStatus(PQresultStatus(res
)),
1738 PQerrorMessage(conn
));
1739 if (PQntuples(res
) != 3)
1740 pg_fatal("Expected 3 rows, got %d", PQntuples(res
));
1741 res
= PQgetResult(conn
);
1743 pg_fatal("unexpected NULL");
1744 if (PQresultStatus(res
) != PGRES_TUPLES_CHUNK
)
1745 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1746 PQresStatus(PQresultStatus(res
)));
1747 if (PQntuples(res
) != 2)
1748 pg_fatal("Expected 2 rows, got %d", PQntuples(res
));
1749 res
= PQgetResult(conn
);
1751 pg_fatal("unexpected NULL");
1752 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1753 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1754 PQresStatus(PQresultStatus(res
)));
1755 if (PQntuples(res
) != 0)
1756 pg_fatal("Expected 0 rows, got %d", PQntuples(res
));
1757 if (PQgetResult(conn
) != NULL
)
1758 pg_fatal("expected NULL result");
1760 if (PQexitPipelineMode(conn
) != 1)
1761 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn
));
1763 fprintf(stderr
, "ok\n");
1767 * Simple test to verify that a pipeline is discarded as a whole when there's
1768 * an error, ignoring transaction commands.
1771 test_transaction(PGconn
*conn
)
1777 res
= PQexec(conn
, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1778 "CREATE TABLE pq_pipeline_tst (id int)");
1779 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1780 pg_fatal("failed to create test table: %s",
1781 PQerrorMessage(conn
));
1784 if (PQenterPipelineMode(conn
) != 1)
1785 pg_fatal("failed to enter pipeline mode: %s",
1786 PQerrorMessage(conn
));
1787 if (PQsendPrepare(conn
, "rollback", "ROLLBACK", 0, NULL
) != 1)
1788 pg_fatal("could not send prepare on pipeline: %s",
1789 PQerrorMessage(conn
));
1791 if (PQsendQueryParams(conn
,
1793 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1794 pg_fatal("failed to send query: %s",
1795 PQerrorMessage(conn
));
1796 if (PQsendQueryParams(conn
,
1798 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1799 pg_fatal("failed to send query: %s",
1800 PQerrorMessage(conn
));
1803 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1804 * get out of the pipeline-aborted state first.
1806 if (PQsendQueryPrepared(conn
, "rollback", 0, NULL
, NULL
, NULL
, 1) != 1)
1807 pg_fatal("failed to execute prepared: %s",
1808 PQerrorMessage(conn
));
1810 /* This insert fails because we're in pipeline-aborted state */
1811 if (PQsendQueryParams(conn
,
1812 "INSERT INTO pq_pipeline_tst VALUES (1)",
1813 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1814 pg_fatal("failed to send query: %s",
1815 PQerrorMessage(conn
));
1816 if (PQpipelineSync(conn
) != 1)
1817 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1821 * This insert fails even though the pipeline got a SYNC, because we're in
1822 * an aborted transaction
1824 if (PQsendQueryParams(conn
,
1825 "INSERT INTO pq_pipeline_tst VALUES (2)",
1826 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1827 pg_fatal("failed to send query: %s",
1828 PQerrorMessage(conn
));
1829 if (PQpipelineSync(conn
) != 1)
1830 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1834 * Send ROLLBACK using prepared stmt. This one works because we just did
1835 * PQpipelineSync above.
1837 if (PQsendQueryPrepared(conn
, "rollback", 0, NULL
, NULL
, NULL
, 1) != 1)
1838 pg_fatal("failed to execute prepared: %s",
1839 PQerrorMessage(conn
));
1842 * Now that we're out of a transaction and in pipeline-good mode, this
1845 if (PQsendQueryParams(conn
,
1846 "INSERT INTO pq_pipeline_tst VALUES (3)",
1847 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1848 pg_fatal("failed to send query: %s",
1849 PQerrorMessage(conn
));
1850 /* Send two syncs now -- match up to SYNC messages below */
1851 if (PQpipelineSync(conn
) != 1)
1852 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1854 if (PQpipelineSync(conn
) != 1)
1855 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1858 expect_null
= false;
1859 for (int i
= 0;; i
++)
1861 ExecStatusType restype
;
1863 res
= PQgetResult(conn
);
1866 printf("%d: got NULL result\n", i
);
1868 pg_fatal("did not expect NULL here");
1869 expect_null
= false;
1872 restype
= PQresultStatus(res
);
1873 printf("%d: got status %s", i
, PQresStatus(restype
));
1875 pg_fatal("expected NULL");
1876 if (restype
== PGRES_FATAL_ERROR
)
1877 printf("; error: %s", PQerrorMessage(conn
));
1878 else if (restype
== PGRES_PIPELINE_ABORTED
)
1880 printf(": command didn't run because pipeline aborted\n");
1886 if (restype
== PGRES_PIPELINE_SYNC
)
1893 if (PQgetResult(conn
) != NULL
)
1894 pg_fatal("returned something extra after all the syncs: %s",
1895 PQresStatus(PQresultStatus(res
)));
1897 if (PQexitPipelineMode(conn
) != 1)
1898 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn
));
1900 /* We expect to find one tuple containing the value "3" */
1901 res
= PQexec(conn
, "SELECT * FROM pq_pipeline_tst");
1902 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1903 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn
));
1904 if (PQntuples(res
) != 1)
1905 pg_fatal("did not get 1 tuple");
1906 if (strcmp(PQgetvalue(res
, 0, 0), "3") != 0)
1907 pg_fatal("did not get expected tuple");
1910 fprintf(stderr
, "ok\n");
1914 * In this test mode we send a stream of queries, with one in the middle
1915 * causing an error. Verify that we can still send some more after the
1916 * error and have libpq work properly.
1919 test_uniqviol(PGconn
*conn
)
1921 int sock
= PQsocket(conn
);
1923 Oid paramTypes
[2] = {INT8OID
, INT8OID
};
1924 const char *paramValues
[2];
1925 char paramValue0
[MAXINT8LEN
];
1926 char paramValue1
[MAXINT8LEN
];
1930 bool read_done
= false;
1931 bool write_done
= false;
1932 bool error_sent
= false;
1933 bool got_error
= false;
1939 fprintf(stderr
, "uniqviol ...");
1941 PQsetnonblocking(conn
, 1);
1943 paramValues
[0] = paramValue0
;
1944 paramValues
[1] = paramValue1
;
1945 sprintf(paramValue1
, "42");
1947 res
= PQexec(conn
, "drop table if exists ppln_uniqviol;"
1948 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1949 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1950 pg_fatal("failed to create table: %s", PQerrorMessage(conn
));
1952 res
= PQexec(conn
, "begin");
1953 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1954 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn
));
1956 res
= PQprepare(conn
, "insertion",
1957 "insert into ppln_uniqviol values ($1, $2) returning id",
1959 if (res
== NULL
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
1960 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn
));
1962 if (PQenterPipelineMode(conn
) != 1)
1963 pg_fatal("failed to enter pipeline mode");
1968 * Avoid deadlocks by reading everything the server has sent before
1969 * sending anything. (Special precaution is needed here to process
1970 * PQisBusy before testing the socket for read-readiness, because the
1971 * socket does not turn read-ready after "sending" queries in aborted
1974 while (PQisBusy(conn
) == 0)
1978 if (results
>= numsent
)
1985 res
= PQgetResult(conn
);
1986 new_error
= process_result(conn
, res
, results
, numsent
);
1987 if (new_error
&& got_error
)
1988 pg_fatal("got two errors");
1989 got_error
|= new_error
;
1990 if (results
++ >= numsent
- 1)
2002 FD_SET(sock
, &out_fds
);
2005 FD_SET(sock
, &in_fds
);
2007 if (select(sock
+ 1, &in_fds
, write_done
? NULL
: &out_fds
, NULL
, NULL
) == -1)
2011 pg_fatal("select() failed: %m");
2014 if (FD_ISSET(sock
, &in_fds
) && PQconsumeInput(conn
) == 0)
2015 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn
));
2018 * If the socket is writable and we haven't finished sending queries,
2021 if (!write_done
&& FD_ISSET(sock
, &out_fds
))
2028 * provoke uniqueness violation exactly once after having
2029 * switched to read mode.
2031 if (switched
>= 1 && !error_sent
&& ctr
% socketful
>= socketful
/ 2)
2033 sprintf(paramValue0
, "%d", numsent
/ 2);
2034 fprintf(stderr
, "E");
2039 fprintf(stderr
, ".");
2040 sprintf(paramValue0
, "%d", ctr
++);
2043 if (PQsendQueryPrepared(conn
, "insertion", 2, paramValues
, NULL
, NULL
, 0) != 1)
2044 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn
));
2047 /* Are we done writing? */
2048 if (socketful
!= 0 && numsent
% socketful
== 42 && error_sent
)
2050 if (PQsendFlushRequest(conn
) != 1)
2051 pg_fatal("failed to send flush request");
2053 fprintf(stderr
, "\ndone writing\n");
2058 /* is the outgoing socket full? */
2059 flush
= PQflush(conn
);
2061 pg_fatal("failed to flush: %s", PQerrorMessage(conn
));
2065 socketful
= numsent
;
2066 fprintf(stderr
, "\nswitch to reading\n");
2075 pg_fatal("did not get expected error");
2077 fprintf(stderr
, "ok\n");
2081 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2082 * the expected NULL that should follow it.
2084 * Returns true if we read a fatal error message, otherwise false.
2087 process_result(PGconn
*conn
, PGresult
*res
, int results
, int numsent
)
2090 bool got_error
= false;
2093 pg_fatal("got unexpected NULL");
2095 switch (PQresultStatus(res
))
2097 case PGRES_FATAL_ERROR
:
2099 fprintf(stderr
, "result %d/%d (error): %s\n", results
, numsent
, PQerrorMessage(conn
));
2102 res2
= PQgetResult(conn
);
2104 pg_fatal("expected NULL, got %s",
2105 PQresStatus(PQresultStatus(res2
)));
2108 case PGRES_TUPLES_OK
:
2109 fprintf(stderr
, "result %d/%d: %s\n", results
, numsent
, PQgetvalue(res
, 0, 0));
2112 res2
= PQgetResult(conn
);
2114 pg_fatal("expected NULL, got %s",
2115 PQresStatus(PQresultStatus(res2
)));
2118 case PGRES_PIPELINE_ABORTED
:
2119 fprintf(stderr
, "result %d/%d: pipeline aborted\n", results
, numsent
);
2120 res2
= PQgetResult(conn
);
2122 pg_fatal("expected NULL, got %s",
2123 PQresStatus(PQresultStatus(res2
)));
2127 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res
)));
2135 usage(const char *progname
)
2137 fprintf(stderr
, "%s tests libpq's pipeline mode.\n\n", progname
);
2138 fprintf(stderr
, "Usage:\n");
2139 fprintf(stderr
, " %s [OPTION] tests\n", progname
);
2140 fprintf(stderr
, " %s [OPTION] TESTNAME [CONNINFO]\n", progname
);
2141 fprintf(stderr
, "\nOptions:\n");
2142 fprintf(stderr
, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2143 fprintf(stderr
, " -r NUMROWS use NUMROWS as the test size\n");
2147 print_test_list(void)
2150 printf("disallowed_in_pipeline\n");
2151 printf("multi_pipelines\n");
2153 printf("pipeline_abort\n");
2154 printf("pipeline_idle\n");
2155 printf("pipelined_insert\n");
2156 printf("prepared\n");
2157 printf("simple_pipeline\n");
2158 printf("singlerow\n");
2159 printf("transaction\n");
2160 printf("uniqviol\n");
2164 main(int argc
, char **argv
)
2166 const char *conninfo
= "";
2170 int numrows
= 10000;
2174 while ((c
= getopt(argc
, argv
, "r:t:")) != -1)
2178 case 'r': /* numrows */
2180 numrows
= strtol(optarg
, NULL
, 10);
2181 if (errno
!= 0 || numrows
<= 0)
2183 fprintf(stderr
, "couldn't parse \"%s\" as a positive integer\n",
2188 case 't': /* trace file */
2189 tracefile
= pg_strdup(optarg
);
2196 testname
= pg_strdup(argv
[optind
]);
2205 if (strcmp(testname
, "tests") == 0)
2213 conninfo
= pg_strdup(argv
[optind
]);
2217 /* Make a connection to the database */
2218 conn
= PQconnectdb(conninfo
);
2219 if (PQstatus(conn
) != CONNECTION_OK
)
2221 fprintf(stderr
, "Connection to database failed: %s\n",
2222 PQerrorMessage(conn
));
2226 res
= PQexec(conn
, "SET lc_messages TO \"C\"");
2227 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
2228 pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn
));
2229 res
= PQexec(conn
, "SET debug_parallel_query = off");
2230 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
2231 pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn
));
2233 /* Set the trace file, if requested */
2234 if (tracefile
!= NULL
)
2236 if (strcmp(tracefile
, "-") == 0)
2239 trace
= fopen(tracefile
, "w");
2241 pg_fatal("could not open file \"%s\": %m", tracefile
);
2243 /* Make it line-buffered */
2244 setvbuf(trace
, NULL
, PG_IOLBF
, 0);
2246 PQtrace(conn
, trace
);
2247 PQsetTraceFlags(conn
,
2248 PQTRACE_SUPPRESS_TIMESTAMPS
| PQTRACE_REGRESS_MODE
);
2251 if (strcmp(testname
, "cancel") == 0)
2253 else if (strcmp(testname
, "disallowed_in_pipeline") == 0)
2254 test_disallowed_in_pipeline(conn
);
2255 else if (strcmp(testname
, "multi_pipelines") == 0)
2256 test_multi_pipelines(conn
);
2257 else if (strcmp(testname
, "nosync") == 0)
2259 else if (strcmp(testname
, "pipeline_abort") == 0)
2260 test_pipeline_abort(conn
);
2261 else if (strcmp(testname
, "pipeline_idle") == 0)
2262 test_pipeline_idle(conn
);
2263 else if (strcmp(testname
, "pipelined_insert") == 0)
2264 test_pipelined_insert(conn
, numrows
);
2265 else if (strcmp(testname
, "prepared") == 0)
2266 test_prepared(conn
);
2267 else if (strcmp(testname
, "simple_pipeline") == 0)
2268 test_simple_pipeline(conn
);
2269 else if (strcmp(testname
, "singlerow") == 0)
2270 test_singlerowmode(conn
);
2271 else if (strcmp(testname
, "transaction") == 0)
2272 test_transaction(conn
);
2273 else if (strcmp(testname
, "uniqviol") == 0)
2274 test_uniqviol(conn
);
2277 fprintf(stderr
, "\"%s\" is not a recognized test name\n", testname
);
2281 /* close the connection to the database and cleanup */