Doc: add commentary about cowboy assignment of maintenance_work_mem.
[pgsql.git] / src / bin / psql / crosstabview.c
blob3e5cd0a96d4fa90aa480a277ae53e8a08c05dc7a
1 /*
2 * psql - the PostgreSQL interactive terminal
4 * Copyright (c) 2000-2025, PostgreSQL Global Development Group
6 * src/bin/psql/crosstabview.c
7 */
8 #include "postgres_fe.h"
10 #include "common.h"
11 #include "common/int.h"
12 #include "common/logging.h"
13 #include "crosstabview.h"
14 #include "pqexpbuffer.h"
15 #include "psqlscanslash.h"
16 #include "settings.h"
19 * Value/position from the resultset that goes into the horizontal or vertical
20 * crosstabview header.
22 typedef struct _pivot_field
25 * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
26 * value becomes an entry in the vertical header (colV), or horizontal
27 * header (colH). A Null value is represented by a NULL pointer.
29 char *name;
32 * When a sort is requested on an alternative column, this holds
33 * PQgetvalue() for the sort column corresponding to <name>. If <name>
34 * appear multiple times, it's the first value in the order of the results
35 * that is kept. A Null value is represented by a NULL pointer.
37 char *sort_value;
40 * Rank of this value, starting at 0. Initially, it's the relative
41 * position of the first appearance of <name> in the resultset. For
42 * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
43 * When a sort column is specified, ranks get updated in a final pass to
44 * reflect the desired order.
46 int rank;
47 } pivot_field;
49 /* Node in avl_tree */
50 typedef struct _avl_node
52 /* Node contents */
53 pivot_field field;
56 * Height of this node in the tree (number of nodes on the longest path to
57 * a leaf).
59 int height;
62 * Child nodes. [0] points to left subtree, [1] to right subtree. Never
63 * NULL, points to the empty node avl_tree.end when no left or right
64 * value.
66 struct _avl_node *children[2];
67 } avl_node;
70 * Control structure for the AVL tree (binary search tree kept
71 * balanced with the AVL algorithm)
73 typedef struct _avl_tree
75 int count; /* Total number of nodes */
76 avl_node *root; /* root of the tree */
77 avl_node *end; /* Immutable dereferenceable empty tree */
78 } avl_tree;
81 static bool printCrosstab(const PGresult *result,
82 int num_columns, pivot_field *piv_columns, int field_for_columns,
83 int num_rows, pivot_field *piv_rows, int field_for_rows,
84 int field_for_data);
85 static void avlInit(avl_tree *tree);
86 static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
87 static int avlCollectFields(avl_tree *tree, avl_node *node,
88 pivot_field *fields, int idx);
89 static void avlFree(avl_tree *tree, avl_node *node);
90 static void rankSort(int num_columns, pivot_field *piv_columns);
91 static int indexOfColumn(char *arg, const PGresult *res);
92 static int pivotFieldCompare(const void *a, const void *b);
93 static int rankCompare(const void *a, const void *b);
97 * Main entry point to this module.
99 * Process the data from *res according to the options in pset (global),
100 * to generate the horizontal and vertical headers contents,
101 * then call printCrosstab() for the actual output.
103 bool
104 PrintResultInCrosstab(const PGresult *res)
106 bool retval = false;
107 avl_tree piv_columns;
108 avl_tree piv_rows;
109 pivot_field *array_columns = NULL;
110 pivot_field *array_rows = NULL;
111 int num_columns = 0;
112 int num_rows = 0;
113 int field_for_rows;
114 int field_for_columns;
115 int field_for_data;
116 int sort_field_for_columns;
117 int rn;
119 avlInit(&piv_rows);
120 avlInit(&piv_columns);
122 if (PQresultStatus(res) != PGRES_TUPLES_OK)
124 pg_log_error("\\crosstabview: statement did not return a result set");
125 goto error_return;
128 if (PQnfields(res) < 3)
130 pg_log_error("\\crosstabview: query must return at least three columns");
131 goto error_return;
134 /* Process first optional arg (vertical header column) */
135 if (pset.ctv_args[0] == NULL)
136 field_for_rows = 0;
137 else
139 field_for_rows = indexOfColumn(pset.ctv_args[0], res);
140 if (field_for_rows < 0)
141 goto error_return;
144 /* Process second optional arg (horizontal header column) */
145 if (pset.ctv_args[1] == NULL)
146 field_for_columns = 1;
147 else
149 field_for_columns = indexOfColumn(pset.ctv_args[1], res);
150 if (field_for_columns < 0)
151 goto error_return;
154 /* Insist that header columns be distinct */
155 if (field_for_columns == field_for_rows)
157 pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
158 goto error_return;
161 /* Process third optional arg (data column) */
162 if (pset.ctv_args[2] == NULL)
164 int i;
167 * If the data column was not specified, we search for the one not
168 * used as either vertical or horizontal headers. Must be exactly
169 * three columns, or this won't be unique.
171 if (PQnfields(res) != 3)
173 pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
174 goto error_return;
177 field_for_data = -1;
178 for (i = 0; i < PQnfields(res); i++)
180 if (i != field_for_rows && i != field_for_columns)
182 field_for_data = i;
183 break;
186 Assert(field_for_data >= 0);
188 else
190 field_for_data = indexOfColumn(pset.ctv_args[2], res);
191 if (field_for_data < 0)
192 goto error_return;
195 /* Process fourth optional arg (horizontal header sort column) */
196 if (pset.ctv_args[3] == NULL)
197 sort_field_for_columns = -1; /* no sort column */
198 else
200 sort_field_for_columns = indexOfColumn(pset.ctv_args[3], res);
201 if (sort_field_for_columns < 0)
202 goto error_return;
206 * First part: accumulate the names that go into the vertical and
207 * horizontal headers, each into an AVL binary tree to build the set of
208 * DISTINCT values.
211 for (rn = 0; rn < PQntuples(res); rn++)
213 char *val;
214 char *val1;
216 /* horizontal */
217 val = PQgetisnull(res, rn, field_for_columns) ? NULL :
218 PQgetvalue(res, rn, field_for_columns);
219 val1 = NULL;
221 if (sort_field_for_columns >= 0 &&
222 !PQgetisnull(res, rn, sort_field_for_columns))
223 val1 = PQgetvalue(res, rn, sort_field_for_columns);
225 avlMergeValue(&piv_columns, val, val1);
227 if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
229 pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
230 CROSSTABVIEW_MAX_COLUMNS);
231 goto error_return;
234 /* vertical */
235 val = PQgetisnull(res, rn, field_for_rows) ? NULL :
236 PQgetvalue(res, rn, field_for_rows);
238 avlMergeValue(&piv_rows, val, NULL);
242 * Second part: Generate sorted arrays from the AVL trees.
245 num_columns = piv_columns.count;
246 num_rows = piv_rows.count;
248 array_columns = (pivot_field *)
249 pg_malloc(sizeof(pivot_field) * num_columns);
251 array_rows = (pivot_field *)
252 pg_malloc(sizeof(pivot_field) * num_rows);
254 avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0);
255 avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0);
258 * Third part: optionally, process the ranking data for the horizontal
259 * header
261 if (sort_field_for_columns >= 0)
262 rankSort(num_columns, array_columns);
265 * Fourth part: print the crosstab'ed result.
267 retval = printCrosstab(res,
268 num_columns, array_columns, field_for_columns,
269 num_rows, array_rows, field_for_rows,
270 field_for_data);
272 error_return:
273 avlFree(&piv_columns, piv_columns.root);
274 avlFree(&piv_rows, piv_rows.root);
275 pg_free(array_columns);
276 pg_free(array_rows);
278 return retval;
282 * Output the pivoted resultset with the printTable* functions. Return true
283 * if successful, false otherwise.
285 static bool
286 printCrosstab(const PGresult *result,
287 int num_columns, pivot_field *piv_columns, int field_for_columns,
288 int num_rows, pivot_field *piv_rows, int field_for_rows,
289 int field_for_data)
291 printQueryOpt popt = pset.popt;
292 printTableContent cont;
293 int i,
295 char col_align;
296 int *horiz_map;
297 bool retval = false;
299 printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows);
301 /* Step 1: set target column names (horizontal header) */
303 /* The name of the first column is kept unchanged by the pivoting */
304 printTableAddHeader(&cont,
305 PQfname(result, field_for_rows),
306 false,
307 column_type_alignment(PQftype(result,
308 field_for_rows)));
311 * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
312 * map associating each piv_columns[].rank to its index in piv_columns.
313 * This avoids an O(N^2) loop later.
315 horiz_map = (int *) pg_malloc(sizeof(int) * num_columns);
316 for (i = 0; i < num_columns; i++)
317 horiz_map[piv_columns[i].rank] = i;
320 * The display alignment depends on its PQftype().
322 col_align = column_type_alignment(PQftype(result, field_for_data));
324 for (i = 0; i < num_columns; i++)
326 char *colname;
328 colname = piv_columns[horiz_map[i]].name ?
329 piv_columns[horiz_map[i]].name :
330 (popt.nullPrint ? popt.nullPrint : "");
332 printTableAddHeader(&cont, colname, false, col_align);
334 pg_free(horiz_map);
336 /* Step 2: set row names in the first output column (vertical header) */
337 for (i = 0; i < num_rows; i++)
339 int k = piv_rows[i].rank;
341 cont.cells[k * (num_columns + 1)] = piv_rows[i].name ?
342 piv_rows[i].name :
343 (popt.nullPrint ? popt.nullPrint : "");
345 cont.cellsadded = num_rows * (num_columns + 1);
348 * Step 3: fill in the content cells.
350 for (rn = 0; rn < PQntuples(result); rn++)
352 int row_number;
353 int col_number;
354 pivot_field *rp,
355 *cp;
356 pivot_field elt;
358 /* Find target row */
359 if (!PQgetisnull(result, rn, field_for_rows))
360 elt.name = PQgetvalue(result, rn, field_for_rows);
361 else
362 elt.name = NULL;
363 rp = (pivot_field *) bsearch(&elt,
364 piv_rows,
365 num_rows,
366 sizeof(pivot_field),
367 pivotFieldCompare);
368 Assert(rp != NULL);
369 row_number = rp->rank;
371 /* Find target column */
372 if (!PQgetisnull(result, rn, field_for_columns))
373 elt.name = PQgetvalue(result, rn, field_for_columns);
374 else
375 elt.name = NULL;
377 cp = (pivot_field *) bsearch(&elt,
378 piv_columns,
379 num_columns,
380 sizeof(pivot_field),
381 pivotFieldCompare);
382 Assert(cp != NULL);
383 col_number = cp->rank;
385 /* Place value into cell */
386 if (col_number >= 0 && row_number >= 0)
388 int idx;
390 /* index into the cont.cells array */
391 idx = 1 + col_number + row_number * (num_columns + 1);
394 * If the cell already contains a value, raise an error.
396 if (cont.cells[idx] != NULL)
398 pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
399 rp->name ? rp->name :
400 (popt.nullPrint ? popt.nullPrint : "(null)"),
401 cp->name ? cp->name :
402 (popt.nullPrint ? popt.nullPrint : "(null)"));
403 goto error;
406 cont.cells[idx] = !PQgetisnull(result, rn, field_for_data) ?
407 PQgetvalue(result, rn, field_for_data) :
408 (popt.nullPrint ? popt.nullPrint : "");
413 * The non-initialized cells must be set to an empty string for the print
414 * functions
416 for (i = 0; i < cont.cellsadded; i++)
418 if (cont.cells[i] == NULL)
419 cont.cells[i] = "";
422 printTable(&cont, pset.queryFout, false, pset.logfile);
423 retval = true;
425 error:
426 printTableCleanup(&cont);
428 return retval;
432 * The avl* functions below provide a minimalistic implementation of AVL binary
433 * trees, to efficiently collect the distinct values that will form the horizontal
434 * and vertical headers. It only supports adding new values, no removal or even
435 * search.
437 static void
438 avlInit(avl_tree *tree)
440 tree->end = (avl_node *) pg_malloc0(sizeof(avl_node));
441 tree->end->children[0] = tree->end->children[1] = tree->end;
442 tree->count = 0;
443 tree->root = tree->end;
446 /* Deallocate recursively an AVL tree, starting from node */
447 static void
448 avlFree(avl_tree *tree, avl_node *node)
450 if (node->children[0] != tree->end)
452 avlFree(tree, node->children[0]);
453 pg_free(node->children[0]);
455 if (node->children[1] != tree->end)
457 avlFree(tree, node->children[1]);
458 pg_free(node->children[1]);
460 if (node == tree->root)
462 /* free the root separately as it's not child of anything */
463 if (node != tree->end)
464 pg_free(node);
465 /* free the tree->end struct only once and when all else is freed */
466 pg_free(tree->end);
470 /* Set the height to 1 plus the greatest of left and right heights */
471 static void
472 avlUpdateHeight(avl_node *n)
474 n->height = 1 + (n->children[0]->height > n->children[1]->height ?
475 n->children[0]->height :
476 n->children[1]->height);
479 /* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
480 static avl_node *
481 avlRotate(avl_node **current, int dir)
483 avl_node *before = *current;
484 avl_node *after = (*current)->children[dir];
486 *current = after;
487 before->children[dir] = after->children[!dir];
488 avlUpdateHeight(before);
489 after->children[!dir] = before;
491 return after;
494 static int
495 avlBalance(avl_node *n)
497 return n->children[0]->height - n->children[1]->height;
501 * After an insertion, possibly rebalance the tree so that the left and right
502 * node heights don't differ by more than 1.
503 * May update *node.
505 static void
506 avlAdjustBalance(avl_tree *tree, avl_node **node)
508 avl_node *current = *node;
509 int b = avlBalance(current) / 2;
511 if (b != 0)
513 int dir = (1 - b) / 2;
515 if (avlBalance(current->children[dir]) == -b)
516 avlRotate(&current->children[dir], !dir);
517 current = avlRotate(node, dir);
519 if (current != tree->end)
520 avlUpdateHeight(current);
524 * Insert a new value/field, starting from *node, reaching the correct position
525 * in the tree by recursion. Possibly rebalance the tree and possibly update
526 * *node. Do nothing if the value is already present in the tree.
528 static void
529 avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field)
531 avl_node *current = *node;
533 if (current == tree->end)
535 avl_node *new_node = (avl_node *)
536 pg_malloc(sizeof(avl_node));
538 new_node->height = 1;
539 new_node->field = field;
540 new_node->children[0] = new_node->children[1] = tree->end;
541 tree->count++;
542 *node = new_node;
544 else
546 int cmp = pivotFieldCompare(&field, &current->field);
548 if (cmp != 0)
550 avlInsertNode(tree,
551 cmp > 0 ? &current->children[1] : &current->children[0],
552 field);
553 avlAdjustBalance(tree, node);
558 /* Insert the value into the AVL tree, if it does not preexist */
559 static void
560 avlMergeValue(avl_tree *tree, char *name, char *sort_value)
562 pivot_field field;
564 field.name = name;
565 field.rank = tree->count;
566 field.sort_value = sort_value;
567 avlInsertNode(tree, &tree->root, field);
571 * Recursively extract node values into the names array, in sorted order with a
572 * left-to-right tree traversal.
573 * Return the next candidate offset to write into the names array.
574 * fields[] must be preallocated to hold tree->count entries
576 static int
577 avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx)
579 if (node == tree->end)
580 return idx;
582 idx = avlCollectFields(tree, node->children[0], fields, idx);
583 fields[idx] = node->field;
584 return avlCollectFields(tree, node->children[1], fields, idx + 1);
587 static void
588 rankSort(int num_columns, pivot_field *piv_columns)
590 int *hmap; /* [[offset in piv_columns, rank], ...for
591 * every header entry] */
592 int i;
594 hmap = (int *) pg_malloc(sizeof(int) * num_columns * 2);
595 for (i = 0; i < num_columns; i++)
597 char *val = piv_columns[i].sort_value;
599 /* ranking information is valid if non null and matches /^-?\d+$/ */
600 if (val &&
601 ((*val == '-' &&
602 strspn(val + 1, "0123456789") == strlen(val + 1)) ||
603 strspn(val, "0123456789") == strlen(val)))
605 hmap[i * 2] = atoi(val);
606 hmap[i * 2 + 1] = i;
608 else
610 /* invalid rank information ignored (equivalent to rank 0) */
611 hmap[i * 2] = 0;
612 hmap[i * 2 + 1] = i;
616 qsort(hmap, num_columns, sizeof(int) * 2, rankCompare);
618 for (i = 0; i < num_columns; i++)
620 piv_columns[hmap[i * 2 + 1]].rank = i;
623 pg_free(hmap);
627 * Look up a column reference, which can be either:
628 * - a number from 1 to PQnfields(res)
629 * - a column name matching one of PQfname(res,...)
631 * Returns zero-based column number, or -1 if not found or ambiguous.
633 * Note: may modify contents of "arg" string.
635 static int
636 indexOfColumn(char *arg, const PGresult *res)
638 int idx;
640 if (arg[0] && strspn(arg, "0123456789") == strlen(arg))
642 /* if arg contains only digits, it's a column number */
643 idx = atoi(arg) - 1;
644 if (idx < 0 || idx >= PQnfields(res))
646 pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
647 idx + 1, PQnfields(res));
648 return -1;
651 else
653 int i;
656 * Dequote and downcase the column name. By checking for all-digits
657 * before doing this, we can ensure that a quoted name is treated as a
658 * name even if it's all digits.
660 dequote_downcase_identifier(arg, true, pset.encoding);
662 /* Now look for match(es) among res' column names */
663 idx = -1;
664 for (i = 0; i < PQnfields(res); i++)
666 if (strcmp(arg, PQfname(res, i)) == 0)
668 if (idx >= 0)
670 /* another idx was already found for the same name */
671 pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg);
672 return -1;
674 idx = i;
677 if (idx == -1)
679 pg_log_error("\\crosstabview: column name not found: \"%s\"", arg);
680 return -1;
684 return idx;
688 * Value comparator for vertical and horizontal headers
689 * used for deduplication only.
690 * - null values are considered equal
691 * - non-null < null
692 * - non-null values are compared with strcmp()
694 static int
695 pivotFieldCompare(const void *a, const void *b)
697 const pivot_field *pa = (const pivot_field *) a;
698 const pivot_field *pb = (const pivot_field *) b;
700 /* test null values */
701 if (!pb->name)
702 return pa->name ? -1 : 0;
703 else if (!pa->name)
704 return 1;
706 /* non-null values */
707 return strcmp(pa->name, pb->name);
710 static int
711 rankCompare(const void *a, const void *b)
713 return pg_cmp_s32(*(const int *) a, *(const int *) b);