2 * psql - the PostgreSQL interactive terminal
4 * Copyright (c) 2000-2021, PostgreSQL Global Development Group
6 * src/bin/psql/crosstabview.c
8 #include "postgres_fe.h"
11 #include "common/logging.h"
12 #include "crosstabview.h"
13 #include "pqexpbuffer.h"
14 #include "psqlscanslash.h"
18 * Value/position from the resultset that goes into the horizontal or vertical
19 * crosstabview header.
21 typedef struct _pivot_field
24 * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
25 * value becomes an entry in the vertical header (colV), or horizontal
26 * header (colH). A Null value is represented by a NULL pointer.
31 * When a sort is requested on an alternative column, this holds
32 * PQgetvalue() for the sort column corresponding to <name>. If <name>
33 * appear multiple times, it's the first value in the order of the results
34 * that is kept. A Null value is represented by a NULL pointer.
39 * Rank of this value, starting at 0. Initially, it's the relative
40 * position of the first appearance of <name> in the resultset. For
41 * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
42 * When a sort column is specified, ranks get updated in a final pass to
43 * reflect the desired order.
48 /* Node in avl_tree */
49 typedef struct _avl_node
55 * Height of this node in the tree (number of nodes on the longest path to
61 * Child nodes. [0] points to left subtree, [1] to right subtree. Never
62 * NULL, points to the empty node avl_tree.end when no left or right
65 struct _avl_node
*children
[2];
69 * Control structure for the AVL tree (binary search tree kept
70 * balanced with the AVL algorithm)
72 typedef struct _avl_tree
74 int count
; /* Total number of nodes */
75 avl_node
*root
; /* root of the tree */
76 avl_node
*end
; /* Immutable dereferenceable empty tree */
80 static bool printCrosstab(const PGresult
*results
,
81 int num_columns
, pivot_field
*piv_columns
, int field_for_columns
,
82 int num_rows
, pivot_field
*piv_rows
, int field_for_rows
,
84 static void avlInit(avl_tree
*tree
);
85 static void avlMergeValue(avl_tree
*tree
, char *name
, char *sort_value
);
86 static int avlCollectFields(avl_tree
*tree
, avl_node
*node
,
87 pivot_field
*fields
, int idx
);
88 static void avlFree(avl_tree
*tree
, avl_node
*node
);
89 static void rankSort(int num_columns
, pivot_field
*piv_columns
);
90 static int indexOfColumn(char *arg
, const PGresult
*res
);
91 static int pivotFieldCompare(const void *a
, const void *b
);
92 static int rankCompare(const void *a
, const void *b
);
96 * Main entry point to this module.
98 * Process the data from *res according to the options in pset (global),
99 * to generate the horizontal and vertical headers contents,
100 * then call printCrosstab() for the actual output.
103 PrintResultsInCrosstab(const PGresult
*res
)
106 avl_tree piv_columns
;
108 pivot_field
*array_columns
= NULL
;
109 pivot_field
*array_rows
= NULL
;
113 int field_for_columns
;
115 int sort_field_for_columns
;
119 avlInit(&piv_columns
);
121 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
123 pg_log_error("\\crosstabview: statement did not return a result set");
127 if (PQnfields(res
) < 3)
129 pg_log_error("\\crosstabview: query must return at least three columns");
133 /* Process first optional arg (vertical header column) */
134 if (pset
.ctv_args
[0] == NULL
)
138 field_for_rows
= indexOfColumn(pset
.ctv_args
[0], res
);
139 if (field_for_rows
< 0)
143 /* Process second optional arg (horizontal header column) */
144 if (pset
.ctv_args
[1] == NULL
)
145 field_for_columns
= 1;
148 field_for_columns
= indexOfColumn(pset
.ctv_args
[1], res
);
149 if (field_for_columns
< 0)
153 /* Insist that header columns be distinct */
154 if (field_for_columns
== field_for_rows
)
156 pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
160 /* Process third optional arg (data column) */
161 if (pset
.ctv_args
[2] == NULL
)
166 * If the data column was not specified, we search for the one not
167 * used as either vertical or horizontal headers. Must be exactly
168 * three columns, or this won't be unique.
170 if (PQnfields(res
) != 3)
172 pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
177 for (i
= 0; i
< PQnfields(res
); i
++)
179 if (i
!= field_for_rows
&& i
!= field_for_columns
)
185 Assert(field_for_data
>= 0);
189 field_for_data
= indexOfColumn(pset
.ctv_args
[2], res
);
190 if (field_for_data
< 0)
194 /* Process fourth optional arg (horizontal header sort column) */
195 if (pset
.ctv_args
[3] == NULL
)
196 sort_field_for_columns
= -1; /* no sort column */
199 sort_field_for_columns
= indexOfColumn(pset
.ctv_args
[3], res
);
200 if (sort_field_for_columns
< 0)
205 * First part: accumulate the names that go into the vertical and
206 * horizontal headers, each into an AVL binary tree to build the set of
210 for (rn
= 0; rn
< PQntuples(res
); rn
++)
216 val
= PQgetisnull(res
, rn
, field_for_columns
) ? NULL
:
217 PQgetvalue(res
, rn
, field_for_columns
);
220 if (sort_field_for_columns
>= 0 &&
221 !PQgetisnull(res
, rn
, sort_field_for_columns
))
222 val1
= PQgetvalue(res
, rn
, sort_field_for_columns
);
224 avlMergeValue(&piv_columns
, val
, val1
);
226 if (piv_columns
.count
> CROSSTABVIEW_MAX_COLUMNS
)
228 pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
229 CROSSTABVIEW_MAX_COLUMNS
);
234 val
= PQgetisnull(res
, rn
, field_for_rows
) ? NULL
:
235 PQgetvalue(res
, rn
, field_for_rows
);
237 avlMergeValue(&piv_rows
, val
, NULL
);
241 * Second part: Generate sorted arrays from the AVL trees.
244 num_columns
= piv_columns
.count
;
245 num_rows
= piv_rows
.count
;
247 array_columns
= (pivot_field
*)
248 pg_malloc(sizeof(pivot_field
) * num_columns
);
250 array_rows
= (pivot_field
*)
251 pg_malloc(sizeof(pivot_field
) * num_rows
);
253 avlCollectFields(&piv_columns
, piv_columns
.root
, array_columns
, 0);
254 avlCollectFields(&piv_rows
, piv_rows
.root
, array_rows
, 0);
257 * Third part: optionally, process the ranking data for the horizontal
260 if (sort_field_for_columns
>= 0)
261 rankSort(num_columns
, array_columns
);
264 * Fourth part: print the crosstab'ed results.
266 retval
= printCrosstab(res
,
267 num_columns
, array_columns
, field_for_columns
,
268 num_rows
, array_rows
, field_for_rows
,
272 avlFree(&piv_columns
, piv_columns
.root
);
273 avlFree(&piv_rows
, piv_rows
.root
);
274 pg_free(array_columns
);
281 * Output the pivoted resultset with the printTable* functions. Return true
282 * if successful, false otherwise.
285 printCrosstab(const PGresult
*results
,
286 int num_columns
, pivot_field
*piv_columns
, int field_for_columns
,
287 int num_rows
, pivot_field
*piv_rows
, int field_for_rows
,
290 printQueryOpt popt
= pset
.popt
;
291 printTableContent cont
;
298 printTableInit(&cont
, &popt
.topt
, popt
.title
, num_columns
+ 1, num_rows
);
300 /* Step 1: set target column names (horizontal header) */
302 /* The name of the first column is kept unchanged by the pivoting */
303 printTableAddHeader(&cont
,
304 PQfname(results
, field_for_rows
),
306 column_type_alignment(PQftype(results
,
310 * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
311 * map associating each piv_columns[].rank to its index in piv_columns.
312 * This avoids an O(N^2) loop later.
314 horiz_map
= (int *) pg_malloc(sizeof(int) * num_columns
);
315 for (i
= 0; i
< num_columns
; i
++)
316 horiz_map
[piv_columns
[i
].rank
] = i
;
319 * The display alignment depends on its PQftype().
321 col_align
= column_type_alignment(PQftype(results
, field_for_data
));
323 for (i
= 0; i
< num_columns
; i
++)
327 colname
= piv_columns
[horiz_map
[i
]].name
?
328 piv_columns
[horiz_map
[i
]].name
:
329 (popt
.nullPrint
? popt
.nullPrint
: "");
331 printTableAddHeader(&cont
, colname
, false, col_align
);
335 /* Step 2: set row names in the first output column (vertical header) */
336 for (i
= 0; i
< num_rows
; i
++)
338 int k
= piv_rows
[i
].rank
;
340 cont
.cells
[k
* (num_columns
+ 1)] = piv_rows
[i
].name
?
342 (popt
.nullPrint
? popt
.nullPrint
: "");
344 cont
.cellsadded
= num_rows
* (num_columns
+ 1);
347 * Step 3: fill in the content cells.
349 for (rn
= 0; rn
< PQntuples(results
); rn
++)
357 /* Find target row */
358 if (!PQgetisnull(results
, rn
, field_for_rows
))
359 elt
.name
= PQgetvalue(results
, rn
, field_for_rows
);
362 rp
= (pivot_field
*) bsearch(&elt
,
368 row_number
= rp
->rank
;
370 /* Find target column */
371 if (!PQgetisnull(results
, rn
, field_for_columns
))
372 elt
.name
= PQgetvalue(results
, rn
, field_for_columns
);
376 cp
= (pivot_field
*) bsearch(&elt
,
382 col_number
= cp
->rank
;
384 /* Place value into cell */
385 if (col_number
>= 0 && row_number
>= 0)
389 /* index into the cont.cells array */
390 idx
= 1 + col_number
+ row_number
* (num_columns
+ 1);
393 * If the cell already contains a value, raise an error.
395 if (cont
.cells
[idx
] != NULL
)
397 pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
398 rp
->name
? rp
->name
:
399 (popt
.nullPrint
? popt
.nullPrint
: "(null)"),
400 cp
->name
? cp
->name
:
401 (popt
.nullPrint
? popt
.nullPrint
: "(null)"));
405 cont
.cells
[idx
] = !PQgetisnull(results
, rn
, field_for_data
) ?
406 PQgetvalue(results
, rn
, field_for_data
) :
407 (popt
.nullPrint
? popt
.nullPrint
: "");
412 * The non-initialized cells must be set to an empty string for the print
415 for (i
= 0; i
< cont
.cellsadded
; i
++)
417 if (cont
.cells
[i
] == NULL
)
421 printTable(&cont
, pset
.queryFout
, false, pset
.logfile
);
425 printTableCleanup(&cont
);
431 * The avl* functions below provide a minimalistic implementation of AVL binary
432 * trees, to efficiently collect the distinct values that will form the horizontal
433 * and vertical headers. It only supports adding new values, no removal or even
437 avlInit(avl_tree
*tree
)
439 tree
->end
= (avl_node
*) pg_malloc0(sizeof(avl_node
));
440 tree
->end
->children
[0] = tree
->end
->children
[1] = tree
->end
;
442 tree
->root
= tree
->end
;
445 /* Deallocate recursively an AVL tree, starting from node */
447 avlFree(avl_tree
*tree
, avl_node
*node
)
449 if (node
->children
[0] != tree
->end
)
451 avlFree(tree
, node
->children
[0]);
452 pg_free(node
->children
[0]);
454 if (node
->children
[1] != tree
->end
)
456 avlFree(tree
, node
->children
[1]);
457 pg_free(node
->children
[1]);
459 if (node
== tree
->root
)
461 /* free the root separately as it's not child of anything */
462 if (node
!= tree
->end
)
464 /* free the tree->end struct only once and when all else is freed */
469 /* Set the height to 1 plus the greatest of left and right heights */
471 avlUpdateHeight(avl_node
*n
)
473 n
->height
= 1 + (n
->children
[0]->height
> n
->children
[1]->height
?
474 n
->children
[0]->height
:
475 n
->children
[1]->height
);
478 /* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
480 avlRotate(avl_node
**current
, int dir
)
482 avl_node
*before
= *current
;
483 avl_node
*after
= (*current
)->children
[dir
];
486 before
->children
[dir
] = after
->children
[!dir
];
487 avlUpdateHeight(before
);
488 after
->children
[!dir
] = before
;
494 avlBalance(avl_node
*n
)
496 return n
->children
[0]->height
- n
->children
[1]->height
;
500 * After an insertion, possibly rebalance the tree so that the left and right
501 * node heights don't differ by more than 1.
505 avlAdjustBalance(avl_tree
*tree
, avl_node
**node
)
507 avl_node
*current
= *node
;
508 int b
= avlBalance(current
) / 2;
512 int dir
= (1 - b
) / 2;
514 if (avlBalance(current
->children
[dir
]) == -b
)
515 avlRotate(¤t
->children
[dir
], !dir
);
516 current
= avlRotate(node
, dir
);
518 if (current
!= tree
->end
)
519 avlUpdateHeight(current
);
523 * Insert a new value/field, starting from *node, reaching the correct position
524 * in the tree by recursion. Possibly rebalance the tree and possibly update
525 * *node. Do nothing if the value is already present in the tree.
528 avlInsertNode(avl_tree
*tree
, avl_node
**node
, pivot_field field
)
530 avl_node
*current
= *node
;
532 if (current
== tree
->end
)
534 avl_node
*new_node
= (avl_node
*)
535 pg_malloc(sizeof(avl_node
));
537 new_node
->height
= 1;
538 new_node
->field
= field
;
539 new_node
->children
[0] = new_node
->children
[1] = tree
->end
;
545 int cmp
= pivotFieldCompare(&field
, ¤t
->field
);
550 cmp
> 0 ? ¤t
->children
[1] : ¤t
->children
[0],
552 avlAdjustBalance(tree
, node
);
557 /* Insert the value into the AVL tree, if it does not preexist */
559 avlMergeValue(avl_tree
*tree
, char *name
, char *sort_value
)
564 field
.rank
= tree
->count
;
565 field
.sort_value
= sort_value
;
566 avlInsertNode(tree
, &tree
->root
, field
);
570 * Recursively extract node values into the names array, in sorted order with a
571 * left-to-right tree traversal.
572 * Return the next candidate offset to write into the names array.
573 * fields[] must be preallocated to hold tree->count entries
576 avlCollectFields(avl_tree
*tree
, avl_node
*node
, pivot_field
*fields
, int idx
)
578 if (node
== tree
->end
)
581 idx
= avlCollectFields(tree
, node
->children
[0], fields
, idx
);
582 fields
[idx
] = node
->field
;
583 return avlCollectFields(tree
, node
->children
[1], fields
, idx
+ 1);
587 rankSort(int num_columns
, pivot_field
*piv_columns
)
589 int *hmap
; /* [[offset in piv_columns, rank], ...for
590 * every header entry] */
593 hmap
= (int *) pg_malloc(sizeof(int) * num_columns
* 2);
594 for (i
= 0; i
< num_columns
; i
++)
596 char *val
= piv_columns
[i
].sort_value
;
598 /* ranking information is valid if non null and matches /^-?\d+$/ */
601 strspn(val
+ 1, "0123456789") == strlen(val
+ 1)) ||
602 strspn(val
, "0123456789") == strlen(val
)))
604 hmap
[i
* 2] = atoi(val
);
609 /* invalid rank information ignored (equivalent to rank 0) */
615 qsort(hmap
, num_columns
, sizeof(int) * 2, rankCompare
);
617 for (i
= 0; i
< num_columns
; i
++)
619 piv_columns
[hmap
[i
* 2 + 1]].rank
= i
;
626 * Look up a column reference, which can be either:
627 * - a number from 1 to PQnfields(res)
628 * - a column name matching one of PQfname(res,...)
630 * Returns zero-based column number, or -1 if not found or ambiguous.
632 * Note: may modify contents of "arg" string.
635 indexOfColumn(char *arg
, const PGresult
*res
)
639 if (arg
[0] && strspn(arg
, "0123456789") == strlen(arg
))
641 /* if arg contains only digits, it's a column number */
643 if (idx
< 0 || idx
>= PQnfields(res
))
645 pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
646 idx
+ 1, PQnfields(res
));
655 * Dequote and downcase the column name. By checking for all-digits
656 * before doing this, we can ensure that a quoted name is treated as a
657 * name even if it's all digits.
659 dequote_downcase_identifier(arg
, true, pset
.encoding
);
661 /* Now look for match(es) among res' column names */
663 for (i
= 0; i
< PQnfields(res
); i
++)
665 if (strcmp(arg
, PQfname(res
, i
)) == 0)
669 /* another idx was already found for the same name */
670 pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg
);
678 pg_log_error("\\crosstabview: column name not found: \"%s\"", arg
);
687 * Value comparator for vertical and horizontal headers
688 * used for deduplication only.
689 * - null values are considered equal
691 * - non-null values are compared with strcmp()
694 pivotFieldCompare(const void *a
, const void *b
)
696 const pivot_field
*pa
= (const pivot_field
*) a
;
697 const pivot_field
*pb
= (const pivot_field
*) b
;
699 /* test null values */
701 return pa
->name
? -1 : 0;
705 /* non-null values */
706 return strcmp(pa
->name
, pb
->name
);
710 rankCompare(const void *a
, const void *b
)
712 return *((const int *) a
) - *((const int *) b
);