2 * psql - the PostgreSQL interactive terminal
4 * Copyright (c) 2000-2025, PostgreSQL Global Development Group
6 * src/bin/psql/crosstabview.c
8 #include "postgres_fe.h"
11 #include "common/int.h"
12 #include "common/logging.h"
13 #include "crosstabview.h"
14 #include "pqexpbuffer.h"
15 #include "psqlscanslash.h"
19 * Value/position from the resultset that goes into the horizontal or vertical
20 * crosstabview header.
22 typedef struct _pivot_field
25 * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
26 * value becomes an entry in the vertical header (colV), or horizontal
27 * header (colH). A Null value is represented by a NULL pointer.
32 * When a sort is requested on an alternative column, this holds
33 * PQgetvalue() for the sort column corresponding to <name>. If <name>
34 * appear multiple times, it's the first value in the order of the results
35 * that is kept. A Null value is represented by a NULL pointer.
40 * Rank of this value, starting at 0. Initially, it's the relative
41 * position of the first appearance of <name> in the resultset. For
42 * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
43 * When a sort column is specified, ranks get updated in a final pass to
44 * reflect the desired order.
49 /* Node in avl_tree */
50 typedef struct _avl_node
56 * Height of this node in the tree (number of nodes on the longest path to
62 * Child nodes. [0] points to left subtree, [1] to right subtree. Never
63 * NULL, points to the empty node avl_tree.end when no left or right
66 struct _avl_node
*children
[2];
70 * Control structure for the AVL tree (binary search tree kept
71 * balanced with the AVL algorithm)
73 typedef struct _avl_tree
75 int count
; /* Total number of nodes */
76 avl_node
*root
; /* root of the tree */
77 avl_node
*end
; /* Immutable dereferenceable empty tree */
81 static bool printCrosstab(const PGresult
*result
,
82 int num_columns
, pivot_field
*piv_columns
, int field_for_columns
,
83 int num_rows
, pivot_field
*piv_rows
, int field_for_rows
,
85 static void avlInit(avl_tree
*tree
);
86 static void avlMergeValue(avl_tree
*tree
, char *name
, char *sort_value
);
87 static int avlCollectFields(avl_tree
*tree
, avl_node
*node
,
88 pivot_field
*fields
, int idx
);
89 static void avlFree(avl_tree
*tree
, avl_node
*node
);
90 static void rankSort(int num_columns
, pivot_field
*piv_columns
);
91 static int indexOfColumn(char *arg
, const PGresult
*res
);
92 static int pivotFieldCompare(const void *a
, const void *b
);
93 static int rankCompare(const void *a
, const void *b
);
97 * Main entry point to this module.
99 * Process the data from *res according to the options in pset (global),
100 * to generate the horizontal and vertical headers contents,
101 * then call printCrosstab() for the actual output.
104 PrintResultInCrosstab(const PGresult
*res
)
107 avl_tree piv_columns
;
109 pivot_field
*array_columns
= NULL
;
110 pivot_field
*array_rows
= NULL
;
114 int field_for_columns
;
116 int sort_field_for_columns
;
120 avlInit(&piv_columns
);
122 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
124 pg_log_error("\\crosstabview: statement did not return a result set");
128 if (PQnfields(res
) < 3)
130 pg_log_error("\\crosstabview: query must return at least three columns");
134 /* Process first optional arg (vertical header column) */
135 if (pset
.ctv_args
[0] == NULL
)
139 field_for_rows
= indexOfColumn(pset
.ctv_args
[0], res
);
140 if (field_for_rows
< 0)
144 /* Process second optional arg (horizontal header column) */
145 if (pset
.ctv_args
[1] == NULL
)
146 field_for_columns
= 1;
149 field_for_columns
= indexOfColumn(pset
.ctv_args
[1], res
);
150 if (field_for_columns
< 0)
154 /* Insist that header columns be distinct */
155 if (field_for_columns
== field_for_rows
)
157 pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
161 /* Process third optional arg (data column) */
162 if (pset
.ctv_args
[2] == NULL
)
167 * If the data column was not specified, we search for the one not
168 * used as either vertical or horizontal headers. Must be exactly
169 * three columns, or this won't be unique.
171 if (PQnfields(res
) != 3)
173 pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
178 for (i
= 0; i
< PQnfields(res
); i
++)
180 if (i
!= field_for_rows
&& i
!= field_for_columns
)
186 Assert(field_for_data
>= 0);
190 field_for_data
= indexOfColumn(pset
.ctv_args
[2], res
);
191 if (field_for_data
< 0)
195 /* Process fourth optional arg (horizontal header sort column) */
196 if (pset
.ctv_args
[3] == NULL
)
197 sort_field_for_columns
= -1; /* no sort column */
200 sort_field_for_columns
= indexOfColumn(pset
.ctv_args
[3], res
);
201 if (sort_field_for_columns
< 0)
206 * First part: accumulate the names that go into the vertical and
207 * horizontal headers, each into an AVL binary tree to build the set of
211 for (rn
= 0; rn
< PQntuples(res
); rn
++)
217 val
= PQgetisnull(res
, rn
, field_for_columns
) ? NULL
:
218 PQgetvalue(res
, rn
, field_for_columns
);
221 if (sort_field_for_columns
>= 0 &&
222 !PQgetisnull(res
, rn
, sort_field_for_columns
))
223 val1
= PQgetvalue(res
, rn
, sort_field_for_columns
);
225 avlMergeValue(&piv_columns
, val
, val1
);
227 if (piv_columns
.count
> CROSSTABVIEW_MAX_COLUMNS
)
229 pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
230 CROSSTABVIEW_MAX_COLUMNS
);
235 val
= PQgetisnull(res
, rn
, field_for_rows
) ? NULL
:
236 PQgetvalue(res
, rn
, field_for_rows
);
238 avlMergeValue(&piv_rows
, val
, NULL
);
242 * Second part: Generate sorted arrays from the AVL trees.
245 num_columns
= piv_columns
.count
;
246 num_rows
= piv_rows
.count
;
248 array_columns
= (pivot_field
*)
249 pg_malloc(sizeof(pivot_field
) * num_columns
);
251 array_rows
= (pivot_field
*)
252 pg_malloc(sizeof(pivot_field
) * num_rows
);
254 avlCollectFields(&piv_columns
, piv_columns
.root
, array_columns
, 0);
255 avlCollectFields(&piv_rows
, piv_rows
.root
, array_rows
, 0);
258 * Third part: optionally, process the ranking data for the horizontal
261 if (sort_field_for_columns
>= 0)
262 rankSort(num_columns
, array_columns
);
265 * Fourth part: print the crosstab'ed result.
267 retval
= printCrosstab(res
,
268 num_columns
, array_columns
, field_for_columns
,
269 num_rows
, array_rows
, field_for_rows
,
273 avlFree(&piv_columns
, piv_columns
.root
);
274 avlFree(&piv_rows
, piv_rows
.root
);
275 pg_free(array_columns
);
282 * Output the pivoted resultset with the printTable* functions. Return true
283 * if successful, false otherwise.
286 printCrosstab(const PGresult
*result
,
287 int num_columns
, pivot_field
*piv_columns
, int field_for_columns
,
288 int num_rows
, pivot_field
*piv_rows
, int field_for_rows
,
291 printQueryOpt popt
= pset
.popt
;
292 printTableContent cont
;
299 printTableInit(&cont
, &popt
.topt
, popt
.title
, num_columns
+ 1, num_rows
);
301 /* Step 1: set target column names (horizontal header) */
303 /* The name of the first column is kept unchanged by the pivoting */
304 printTableAddHeader(&cont
,
305 PQfname(result
, field_for_rows
),
307 column_type_alignment(PQftype(result
,
311 * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
312 * map associating each piv_columns[].rank to its index in piv_columns.
313 * This avoids an O(N^2) loop later.
315 horiz_map
= (int *) pg_malloc(sizeof(int) * num_columns
);
316 for (i
= 0; i
< num_columns
; i
++)
317 horiz_map
[piv_columns
[i
].rank
] = i
;
320 * The display alignment depends on its PQftype().
322 col_align
= column_type_alignment(PQftype(result
, field_for_data
));
324 for (i
= 0; i
< num_columns
; i
++)
328 colname
= piv_columns
[horiz_map
[i
]].name
?
329 piv_columns
[horiz_map
[i
]].name
:
330 (popt
.nullPrint
? popt
.nullPrint
: "");
332 printTableAddHeader(&cont
, colname
, false, col_align
);
336 /* Step 2: set row names in the first output column (vertical header) */
337 for (i
= 0; i
< num_rows
; i
++)
339 int k
= piv_rows
[i
].rank
;
341 cont
.cells
[k
* (num_columns
+ 1)] = piv_rows
[i
].name
?
343 (popt
.nullPrint
? popt
.nullPrint
: "");
345 cont
.cellsadded
= num_rows
* (num_columns
+ 1);
348 * Step 3: fill in the content cells.
350 for (rn
= 0; rn
< PQntuples(result
); rn
++)
358 /* Find target row */
359 if (!PQgetisnull(result
, rn
, field_for_rows
))
360 elt
.name
= PQgetvalue(result
, rn
, field_for_rows
);
363 rp
= (pivot_field
*) bsearch(&elt
,
369 row_number
= rp
->rank
;
371 /* Find target column */
372 if (!PQgetisnull(result
, rn
, field_for_columns
))
373 elt
.name
= PQgetvalue(result
, rn
, field_for_columns
);
377 cp
= (pivot_field
*) bsearch(&elt
,
383 col_number
= cp
->rank
;
385 /* Place value into cell */
386 if (col_number
>= 0 && row_number
>= 0)
390 /* index into the cont.cells array */
391 idx
= 1 + col_number
+ row_number
* (num_columns
+ 1);
394 * If the cell already contains a value, raise an error.
396 if (cont
.cells
[idx
] != NULL
)
398 pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
399 rp
->name
? rp
->name
:
400 (popt
.nullPrint
? popt
.nullPrint
: "(null)"),
401 cp
->name
? cp
->name
:
402 (popt
.nullPrint
? popt
.nullPrint
: "(null)"));
406 cont
.cells
[idx
] = !PQgetisnull(result
, rn
, field_for_data
) ?
407 PQgetvalue(result
, rn
, field_for_data
) :
408 (popt
.nullPrint
? popt
.nullPrint
: "");
413 * The non-initialized cells must be set to an empty string for the print
416 for (i
= 0; i
< cont
.cellsadded
; i
++)
418 if (cont
.cells
[i
] == NULL
)
422 printTable(&cont
, pset
.queryFout
, false, pset
.logfile
);
426 printTableCleanup(&cont
);
432 * The avl* functions below provide a minimalistic implementation of AVL binary
433 * trees, to efficiently collect the distinct values that will form the horizontal
434 * and vertical headers. It only supports adding new values, no removal or even
438 avlInit(avl_tree
*tree
)
440 tree
->end
= (avl_node
*) pg_malloc0(sizeof(avl_node
));
441 tree
->end
->children
[0] = tree
->end
->children
[1] = tree
->end
;
443 tree
->root
= tree
->end
;
446 /* Deallocate recursively an AVL tree, starting from node */
448 avlFree(avl_tree
*tree
, avl_node
*node
)
450 if (node
->children
[0] != tree
->end
)
452 avlFree(tree
, node
->children
[0]);
453 pg_free(node
->children
[0]);
455 if (node
->children
[1] != tree
->end
)
457 avlFree(tree
, node
->children
[1]);
458 pg_free(node
->children
[1]);
460 if (node
== tree
->root
)
462 /* free the root separately as it's not child of anything */
463 if (node
!= tree
->end
)
465 /* free the tree->end struct only once and when all else is freed */
470 /* Set the height to 1 plus the greatest of left and right heights */
472 avlUpdateHeight(avl_node
*n
)
474 n
->height
= 1 + (n
->children
[0]->height
> n
->children
[1]->height
?
475 n
->children
[0]->height
:
476 n
->children
[1]->height
);
479 /* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
481 avlRotate(avl_node
**current
, int dir
)
483 avl_node
*before
= *current
;
484 avl_node
*after
= (*current
)->children
[dir
];
487 before
->children
[dir
] = after
->children
[!dir
];
488 avlUpdateHeight(before
);
489 after
->children
[!dir
] = before
;
495 avlBalance(avl_node
*n
)
497 return n
->children
[0]->height
- n
->children
[1]->height
;
501 * After an insertion, possibly rebalance the tree so that the left and right
502 * node heights don't differ by more than 1.
506 avlAdjustBalance(avl_tree
*tree
, avl_node
**node
)
508 avl_node
*current
= *node
;
509 int b
= avlBalance(current
) / 2;
513 int dir
= (1 - b
) / 2;
515 if (avlBalance(current
->children
[dir
]) == -b
)
516 avlRotate(¤t
->children
[dir
], !dir
);
517 current
= avlRotate(node
, dir
);
519 if (current
!= tree
->end
)
520 avlUpdateHeight(current
);
524 * Insert a new value/field, starting from *node, reaching the correct position
525 * in the tree by recursion. Possibly rebalance the tree and possibly update
526 * *node. Do nothing if the value is already present in the tree.
529 avlInsertNode(avl_tree
*tree
, avl_node
**node
, pivot_field field
)
531 avl_node
*current
= *node
;
533 if (current
== tree
->end
)
535 avl_node
*new_node
= (avl_node
*)
536 pg_malloc(sizeof(avl_node
));
538 new_node
->height
= 1;
539 new_node
->field
= field
;
540 new_node
->children
[0] = new_node
->children
[1] = tree
->end
;
546 int cmp
= pivotFieldCompare(&field
, ¤t
->field
);
551 cmp
> 0 ? ¤t
->children
[1] : ¤t
->children
[0],
553 avlAdjustBalance(tree
, node
);
558 /* Insert the value into the AVL tree, if it does not preexist */
560 avlMergeValue(avl_tree
*tree
, char *name
, char *sort_value
)
565 field
.rank
= tree
->count
;
566 field
.sort_value
= sort_value
;
567 avlInsertNode(tree
, &tree
->root
, field
);
571 * Recursively extract node values into the names array, in sorted order with a
572 * left-to-right tree traversal.
573 * Return the next candidate offset to write into the names array.
574 * fields[] must be preallocated to hold tree->count entries
577 avlCollectFields(avl_tree
*tree
, avl_node
*node
, pivot_field
*fields
, int idx
)
579 if (node
== tree
->end
)
582 idx
= avlCollectFields(tree
, node
->children
[0], fields
, idx
);
583 fields
[idx
] = node
->field
;
584 return avlCollectFields(tree
, node
->children
[1], fields
, idx
+ 1);
588 rankSort(int num_columns
, pivot_field
*piv_columns
)
590 int *hmap
; /* [[offset in piv_columns, rank], ...for
591 * every header entry] */
594 hmap
= (int *) pg_malloc(sizeof(int) * num_columns
* 2);
595 for (i
= 0; i
< num_columns
; i
++)
597 char *val
= piv_columns
[i
].sort_value
;
599 /* ranking information is valid if non null and matches /^-?\d+$/ */
602 strspn(val
+ 1, "0123456789") == strlen(val
+ 1)) ||
603 strspn(val
, "0123456789") == strlen(val
)))
605 hmap
[i
* 2] = atoi(val
);
610 /* invalid rank information ignored (equivalent to rank 0) */
616 qsort(hmap
, num_columns
, sizeof(int) * 2, rankCompare
);
618 for (i
= 0; i
< num_columns
; i
++)
620 piv_columns
[hmap
[i
* 2 + 1]].rank
= i
;
627 * Look up a column reference, which can be either:
628 * - a number from 1 to PQnfields(res)
629 * - a column name matching one of PQfname(res,...)
631 * Returns zero-based column number, or -1 if not found or ambiguous.
633 * Note: may modify contents of "arg" string.
636 indexOfColumn(char *arg
, const PGresult
*res
)
640 if (arg
[0] && strspn(arg
, "0123456789") == strlen(arg
))
642 /* if arg contains only digits, it's a column number */
644 if (idx
< 0 || idx
>= PQnfields(res
))
646 pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
647 idx
+ 1, PQnfields(res
));
656 * Dequote and downcase the column name. By checking for all-digits
657 * before doing this, we can ensure that a quoted name is treated as a
658 * name even if it's all digits.
660 dequote_downcase_identifier(arg
, true, pset
.encoding
);
662 /* Now look for match(es) among res' column names */
664 for (i
= 0; i
< PQnfields(res
); i
++)
666 if (strcmp(arg
, PQfname(res
, i
)) == 0)
670 /* another idx was already found for the same name */
671 pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg
);
679 pg_log_error("\\crosstabview: column name not found: \"%s\"", arg
);
688 * Value comparator for vertical and horizontal headers
689 * used for deduplication only.
690 * - null values are considered equal
692 * - non-null values are compared with strcmp()
695 pivotFieldCompare(const void *a
, const void *b
)
697 const pivot_field
*pa
= (const pivot_field
*) a
;
698 const pivot_field
*pb
= (const pivot_field
*) b
;
700 /* test null values */
702 return pa
->name
? -1 : 0;
706 /* non-null values */
707 return strcmp(pa
->name
, pb
->name
);
711 rankCompare(const void *a
, const void *b
)
713 return pg_cmp_s32(*(const int *) a
, *(const int *) b
);