Independent Samples T-Test Dialog: Fix Crash
[pspp.git] / src / data / psql-reader.c
blob21354e0d7c6046be0f5564c74a27969b668e807f
1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
17 #include <config.h>
19 #include "data/psql-reader.h"
21 #include <inttypes.h>
22 #include <math.h>
23 #include <stdlib.h>
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
39 #include "gettext.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
44 #if !PSQL_SUPPORT
45 struct casereader *
46 psql_open_reader (struct psql_read_info *info UNUSED, struct dictionary **dict UNUSED)
48 msg (ME, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
50 return NULL;
53 #else
55 #include <stdint.h>
56 #include <libpq-fe.h>
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
62 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
63 #define BOOLOID 16
64 #define BYTEAOID 17
65 #define CHAROID 18
66 #define NAMEOID 19
67 #define INT8OID 20
68 #define INT2OID 21
69 #define INT4OID 23
70 #define TEXTOID 25
71 #define OIDOID 26
72 #define FLOAT4OID 700
73 #define FLOAT8OID 701
74 #define CASHOID 790
75 #define BPCHAROID 1042
76 #define VARCHAROID 1043
77 #define DATEOID 1082
78 #define TIMEOID 1083
79 #define TIMESTAMPOID 1114
80 #define TIMESTAMPTZOID 1184
81 #define INTERVALOID 1186
82 #define TIMETZOID 1266
83 #define NUMERICOID 1700
85 static void psql_casereader_destroy (struct casereader *reader UNUSED, void *r_);
87 static struct ccase *psql_casereader_read (struct casereader *, void *);
89 static const struct casereader_class psql_casereader_class =
91 psql_casereader_read,
92 psql_casereader_destroy,
93 NULL,
94 NULL,
97 struct psql_reader
99 PGconn *conn;
100 PGresult *res;
101 int tuple;
103 bool integer_datetimes;
105 double postgres_epoch;
107 struct caseproto *proto;
108 struct dictionary *dict;
110 /* An array of ints, which maps psql column numbers into
111 pspp variables */
112 struct variable **vmap;
113 size_t vmapsize;
115 struct string fetch_cmd;
116 int cache_size;
120 static struct ccase *set_value (struct psql_reader *r);
124 #if WORDS_BIGENDIAN
125 static void
126 data_to_native (const void *in_, void *out_, int len)
128 int i;
129 const unsigned char *in = in_;
130 unsigned char *out = out_;
131 for (i = 0 ; i < len ; ++i)
132 out[i] = in[i];
134 #else
135 static void
136 data_to_native (const void *in_, void *out_, int len)
138 int i;
139 const unsigned char *in = in_;
140 unsigned char *out = out_;
141 for (i = 0 ; i < len ; ++i)
142 out[len - i - 1] = in[i];
144 #endif
147 #define GET_VALUE(IN, OUT) do { \
148 size_t sz = sizeof (OUT); \
149 data_to_native (*(IN), &(OUT), sz) ; \
150 (*IN) += sz; \
151 } while (false)
154 #if 0
155 static void
156 dump (const unsigned char *x, int l)
158 int i;
160 for (i = 0; i < l ; ++i)
162 printf ("%02x ", x[i]);
165 putchar ('\n');
167 for (i = 0; i < l ; ++i)
169 if (isprint (x[i]))
170 printf ("%c ", x[i]);
171 else
172 printf (" ");
175 putchar ('\n');
177 #endif
179 static struct variable *
180 create_var (struct psql_reader *r, struct fmt_spec fmt,
181 int width, const char *suggested_name, int col)
183 struct variable *var
184 = dict_create_var_with_unique_name (r->dict, suggested_name, width);
186 var_set_both_formats (var, fmt);
188 if (col != -1)
190 r->vmap = xrealloc (r->vmap, (col + 1) * sizeof (*r->vmap));
192 r->vmap[col] = var;
193 r->vmapsize = col + 1;
196 return var;
202 /* Fill the cache */
203 static bool
204 reload_cache (struct psql_reader *r)
206 PQclear (r->res);
207 r->tuple = 0;
209 r->res = PQexec (r->conn, ds_cstr (&r->fetch_cmd));
211 if (PQresultStatus (r->res) != PGRES_TUPLES_OK || PQntuples (r->res) < 1)
213 PQclear (r->res);
214 r->res = NULL;
215 return false;
218 return true;
222 struct casereader *
223 psql_open_reader (struct psql_read_info *info, struct dictionary **dict)
225 int i;
226 int n_fields, n_tuples;
227 PGresult *qres = NULL;
228 casenumber n_cases = CASENUMBER_MAX;
229 const char *encoding;
231 struct psql_reader *r = XZALLOC (struct psql_reader);
233 r->conn = PQconnectdb (info->conninfo);
234 if (NULL == r->conn)
236 msg (ME, _("Memory error whilst opening psql source"));
237 goto error;
240 if (PQstatus (r->conn) != CONNECTION_OK)
242 msg (ME, _("Error opening psql source: %s."),
243 PQerrorMessage (r->conn));
245 goto error;
249 int ver_num = 0;
250 const char *vers = PQparameterStatus (r->conn, "server_version");
252 sscanf (vers, "%d", &ver_num);
254 if (ver_num < 8)
256 msg (ME,
257 _("Postgres server is version %s."
258 " Reading from versions earlier than 8.0 is not supported."),
259 vers);
261 goto error;
266 const char *dt = PQparameterStatus (r->conn, "integer_datetimes");
268 r->integer_datetimes = (0 == c_strcasecmp (dt, "on"));
271 #if USE_SSL
272 if (PQgetssl (r->conn) == NULL)
273 #endif
275 if (! info->allow_clear)
277 msg (ME, _("Connection is unencrypted, "
278 "but unencrypted connections have not been permitted."));
279 goto error;
283 r->postgres_epoch = calendar_gregorian_to_offset (
284 2000, 1, 1, settings_get_fmt_settings (), NULL);
287 const int enc = PQclientEncoding (r->conn);
289 /* According to section 22.2 of the Postgresql manual
290 a value of zero (SQL_ASCII) indicates
291 "a declaration of ignorance about the encoding".
292 Accordingly, we use the default encoding
293 if we find this value.
295 encoding = enc ? pg_encoding_to_char (enc) : get_default_encoding ();
297 /* Create the dictionary and populate it */
298 *dict = r->dict = dict_create (encoding);
301 const int version = PQserverVersion (r->conn);
303 Versions before 9.1 don't have the REPEATABLE READ isolation level.
304 However according to <a12321aabb@gmail.com> if the server is in the
305 "hot standby" mode then SERIALIZABLE won't work.
307 char *query = xasprintf (
308 "BEGIN READ ONLY ISOLATION LEVEL %s; "
309 "DECLARE pspp BINARY CURSOR FOR %s",
310 (version < 90100) ? "SERIALIZABLE" : "REPEATABLE READ",
311 info->sql);
312 qres = PQexec (r->conn, query);
313 free (query);
315 if (PQresultStatus (qres) != PGRES_COMMAND_OK)
317 msg (ME, _("Error from psql source: %s."),
318 PQresultErrorMessage (qres));
319 goto error;
322 PQclear (qres);
325 /* Now use the count() function to find the total number of cases
326 that this query returns.
327 Doing this incurs some overhead. The server has to iterate every
328 case in order to find this number. However, it's performed on the
329 server side, and in all except the most huge databases the extra
330 overhead will be worth the effort.
331 On the other hand, most PSPP functions don't need to know this.
332 The GUI is the notable exception.
334 query = xasprintf ("SELECT count (*) FROM (%s) stupid_sql_standard",
335 info->sql);
336 qres = PQexec (r->conn, query);
337 free (query);
339 if (PQresultStatus (qres) != PGRES_TUPLES_OK)
341 msg (ME, _("Error from psql source: %s."),
342 PQresultErrorMessage (qres));
343 goto error;
345 n_cases = atol (PQgetvalue (qres, 0, 0));
346 PQclear (qres);
348 qres = PQexec (r->conn, "FETCH FIRST FROM pspp");
349 if (PQresultStatus (qres) != PGRES_TUPLES_OK)
351 msg (ME, _("Error from psql source: %s."),
352 PQresultErrorMessage (qres));
353 goto error;
356 n_tuples = PQntuples (qres);
357 n_fields = PQnfields (qres);
359 r->proto = NULL;
360 r->vmap = NULL;
361 r->vmapsize = 0;
363 for (i = 0 ; i < n_fields ; ++i)
365 struct variable *var;
366 struct fmt_spec fmt = { .type = FMT_F, .w = 8, .d = 2 };
367 Oid type = PQftype (qres, i);
368 int width = 0;
369 int length ;
371 /* If there are no data then make a finger in the air
372 guess at the contents */
373 if (n_tuples > 0)
374 length = PQgetlength (qres, 0, i);
375 else
376 length = PSQL_DEFAULT_WIDTH;
378 switch (type)
380 case BOOLOID:
381 case OIDOID:
382 case INT2OID:
383 case INT4OID:
384 case INT8OID:
385 case FLOAT4OID:
386 case FLOAT8OID:
387 fmt.type = FMT_F;
388 break;
389 case CASHOID:
390 fmt.type = FMT_DOLLAR;
391 break;
392 case CHAROID:
393 fmt.type = FMT_A;
394 width = length > 0 ? length : 1;
395 fmt.d = 0;
396 fmt.w = 1;
397 break;
398 case TEXTOID:
399 case VARCHAROID:
400 case BPCHAROID:
401 fmt.type = FMT_A;
402 width = (info->str_width == -1) ?
403 ROUND_UP (length, PSQL_DEFAULT_WIDTH) : info->str_width;
404 fmt.w = width;
405 fmt.d = 0;
406 break;
407 case BYTEAOID:
408 fmt.type = FMT_AHEX;
409 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
410 fmt.w = width * 2;
411 fmt.d = 0;
412 break;
413 case INTERVALOID:
414 fmt.type = FMT_DTIME;
415 width = 0;
416 fmt.d = 0;
417 fmt.w = 13;
418 break;
419 case DATEOID:
420 fmt.type = FMT_DATE;
421 width = 0;
422 fmt.w = 11;
423 fmt.d = 0;
424 break;
425 case TIMEOID:
426 case TIMETZOID:
427 fmt.type = FMT_TIME;
428 width = 0;
429 fmt.w = 11;
430 fmt.d = 0;
431 break;
432 case TIMESTAMPOID:
433 case TIMESTAMPTZOID:
434 fmt.type = FMT_DATETIME;
435 fmt.d = 0;
436 fmt.w = 22;
437 width = 0;
438 break;
439 case NUMERICOID:
440 fmt.type = FMT_E;
441 fmt.d = 2;
442 fmt.w = 40;
443 width = 0;
444 break;
445 default:
446 msg (MW, _("Unsupported OID %d. SYSMIS values will be inserted."), type);
447 fmt.type = FMT_A;
448 width = length > 0 ? length : PSQL_DEFAULT_WIDTH;
449 fmt.w = width ;
450 fmt.d = 0;
451 break;
454 if (width == 0 && fmt_is_string (fmt.type))
455 fmt.w = width = PSQL_DEFAULT_WIDTH;
458 var = create_var (r, fmt, width, PQfname (qres, i), i);
459 if (type == NUMERICOID && n_tuples > 0)
461 const uint8_t *vptr = (const uint8_t *) PQgetvalue (qres, 0, i);
462 struct fmt_spec fmt;
463 int16_t n_digits, weight, dscale;
464 uint16_t sign;
466 GET_VALUE (&vptr, n_digits);
467 GET_VALUE (&vptr, weight);
468 GET_VALUE (&vptr, sign);
469 GET_VALUE (&vptr, dscale);
471 fmt.d = dscale;
472 fmt.type = FMT_E;
473 fmt.w = fmt_max_output_width (fmt.type) ;
474 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
475 var_set_both_formats (var, fmt);
478 /* Timezones need an extra variable */
479 switch (type)
481 case TIMETZOID:
483 struct string name;
484 ds_init_cstr (&name, var_get_name (var));
485 ds_put_cstr (&name, "-zone");
486 fmt.type = FMT_F;
487 fmt.w = 8;
488 fmt.d = 2;
490 create_var (r, fmt, 0, ds_cstr (&name), -1);
492 ds_destroy (&name);
494 break;
496 case INTERVALOID:
498 struct string name;
499 ds_init_cstr (&name, var_get_name (var));
500 ds_put_cstr (&name, "-months");
501 fmt.type = FMT_F;
502 fmt.w = 3;
503 fmt.d = 0;
505 create_var (r, fmt, 0, ds_cstr (&name), -1);
507 ds_destroy (&name);
509 default:
510 break;
514 PQclear (qres);
516 qres = PQexec (r->conn, "MOVE BACKWARD 1 FROM pspp");
517 if (PQresultStatus (qres) != PGRES_COMMAND_OK)
519 PQclear (qres);
520 goto error;
522 PQclear (qres);
524 r->cache_size = info->bsize != -1 ? info->bsize: 4096;
526 ds_init_empty (&r->fetch_cmd);
527 ds_put_format (&r->fetch_cmd, "FETCH FORWARD %d FROM pspp", r->cache_size);
529 reload_cache (r);
530 r->proto = caseproto_ref (dict_get_proto (*dict));
532 return casereader_create_sequential
533 (NULL,
534 r->proto,
535 n_cases,
536 &psql_casereader_class, r);
538 error:
539 dict_unref (*dict);
541 psql_casereader_destroy (NULL, r);
542 return NULL;
546 static void
547 psql_casereader_destroy (struct casereader *reader UNUSED, void *r_)
549 struct psql_reader *r = r_;
550 if (r == NULL)
551 return ;
553 ds_destroy (&r->fetch_cmd);
554 free (r->vmap);
555 if (r->res) PQclear (r->res);
556 PQfinish (r->conn);
557 caseproto_unref (r->proto);
559 free (r);
564 static struct ccase *
565 psql_casereader_read (struct casereader *reader UNUSED, void *r_)
567 struct psql_reader *r = r_;
569 if (NULL == r->res || r->tuple >= r->cache_size)
571 if (! reload_cache (r))
572 return false;
575 return set_value (r);
578 static struct ccase *
579 set_value (struct psql_reader *r)
581 struct ccase *c;
582 int n_vars;
583 int i;
585 assert (r->res);
587 n_vars = PQnfields (r->res);
589 if (r->tuple >= PQntuples (r->res))
590 return NULL;
592 c = case_create (r->proto);
593 case_set_missing (c);
596 for (i = 0 ; i < n_vars ; ++i)
598 Oid type = PQftype (r->res, i);
599 const struct variable *v = r->vmap[i];
600 union value *val = case_data_rw (c, v);
602 union value *val1 = NULL;
604 switch (type)
606 case INTERVALOID:
607 case TIMESTAMPTZOID:
608 case TIMETZOID:
609 if (i < r->vmapsize && var_get_dict_index(v) + 1 < dict_get_n_vars (r->dict))
611 const struct variable *v1 = NULL;
612 v1 = dict_get_var (r->dict, var_get_dict_index (v) + 1);
614 val1 = case_data_rw (c, v1);
616 break;
617 default:
618 break;
622 if (PQgetisnull (r->res, r->tuple, i))
624 value_set_missing (val, var_get_width (v));
626 switch (type)
628 case INTERVALOID:
629 case TIMESTAMPTZOID:
630 case TIMETZOID:
631 val1->f = SYSMIS;
632 break;
633 default:
634 break;
637 else
639 const uint8_t *vptr = (const uint8_t *) PQgetvalue (r->res, r->tuple, i);
640 int length = PQgetlength (r->res, r->tuple, i);
642 int var_width = var_get_width (v);
643 switch (type)
645 case BOOLOID:
647 int8_t x;
648 GET_VALUE (&vptr, x);
649 val->f = x;
651 break;
653 case OIDOID:
654 case INT2OID:
656 int16_t x;
657 GET_VALUE (&vptr, x);
658 val->f = x;
660 break;
662 case INT4OID:
664 int32_t x;
665 GET_VALUE (&vptr, x);
666 val->f = x;
668 break;
670 case INT8OID:
672 int64_t x;
673 GET_VALUE (&vptr, x);
674 val->f = x;
676 break;
678 case FLOAT4OID:
680 float n;
681 GET_VALUE (&vptr, n);
682 val->f = n;
684 break;
686 case FLOAT8OID:
688 double n;
689 GET_VALUE (&vptr, n);
690 val->f = n;
692 break;
694 case CASHOID:
696 /* Postgres 8.3 uses 64 bits.
697 Earlier versions use 32 */
698 switch (length)
700 case 8:
702 int64_t x;
703 GET_VALUE (&vptr, x);
704 val->f = x / 100.0;
706 break;
707 case 4:
709 int32_t x;
710 GET_VALUE (&vptr, x);
711 val->f = x / 100.0;
713 break;
714 default:
715 val->f = SYSMIS;
716 break;
719 break;
721 case INTERVALOID:
723 if (r->integer_datetimes)
725 uint32_t months;
726 uint32_t days;
727 uint32_t us;
728 uint32_t things;
730 GET_VALUE (&vptr, things);
731 GET_VALUE (&vptr, us);
732 GET_VALUE (&vptr, days);
733 GET_VALUE (&vptr, months);
735 val->f = us / 1000000.0;
736 val->f += days * 24 * 3600;
738 val1->f = months;
740 else
742 uint32_t days, months;
743 double seconds;
745 GET_VALUE (&vptr, seconds);
746 GET_VALUE (&vptr, days);
747 GET_VALUE (&vptr, months);
749 val->f = seconds;
750 val->f += days * 24 * 3600;
752 val1->f = months;
755 break;
757 case DATEOID:
759 int32_t x;
761 GET_VALUE (&vptr, x);
763 val->f = (x + r->postgres_epoch) * 24 * 3600 ;
765 break;
767 case TIMEOID:
769 if (r->integer_datetimes)
771 uint64_t x;
772 GET_VALUE (&vptr, x);
773 val->f = x / 1000000.0;
775 else
777 double x;
778 GET_VALUE (&vptr, x);
779 val->f = x;
782 break;
784 case TIMETZOID:
786 int32_t zone;
787 if (r->integer_datetimes)
789 uint64_t x;
792 GET_VALUE (&vptr, x);
793 val->f = x / 1000000.0;
795 else
797 double x;
799 GET_VALUE (&vptr, x);
800 val->f = x ;
803 GET_VALUE (&vptr, zone);
804 val1->f = zone / 3600.0;
806 break;
808 case TIMESTAMPOID:
809 case TIMESTAMPTZOID:
811 if (r->integer_datetimes)
813 int64_t x;
815 GET_VALUE (&vptr, x);
817 x /= 1000000;
819 val->f = (x + r->postgres_epoch * 24 * 3600);
821 else
823 double x;
825 GET_VALUE (&vptr, x);
827 val->f = (x + r->postgres_epoch * 24 * 3600);
830 break;
831 case TEXTOID:
832 case VARCHAROID:
833 case BPCHAROID:
834 case BYTEAOID:
835 memcpy (val->s, vptr, MIN (length, var_width));
836 break;
838 case NUMERICOID:
840 double f = 0.0;
841 int i;
842 int16_t n_digits, weight, dscale;
843 uint16_t sign;
845 GET_VALUE (&vptr, n_digits);
846 GET_VALUE (&vptr, weight);
847 GET_VALUE (&vptr, sign);
848 GET_VALUE (&vptr, dscale);
850 #if 0
852 struct fmt_spec fmt;
853 fmt.d = dscale;
854 fmt.type = FMT_E;
855 fmt.w = fmt_max_output_width (fmt.type) ;
856 fmt.d = MIN (dscale, fmt_max_output_decimals (fmt.type, fmt.w));
857 var_set_both_formats (v, &fmt);
859 #endif
861 for (i = 0 ; i < n_digits; ++i)
863 uint16_t x;
864 GET_VALUE (&vptr, x);
865 f += x * pow (10000, weight--);
868 if (sign == 0x4000)
869 f *= -1.0;
871 if (sign == 0xC000)
872 val->f = SYSMIS;
873 else
874 val->f = f;
876 break;
878 default:
879 val->f = SYSMIS;
880 break;
885 r->tuple++;
887 return c;
890 #endif