1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2008, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/psql-reader.h"
25 #include "data/calendar.h"
26 #include "data/casereader-provider.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/variable.h"
30 #include "libpspp/i18n.h"
31 #include "libpspp/message.h"
32 #include "libpspp/misc.h"
33 #include "libpspp/str.h"
35 #include "gl/c-strcase.h"
36 #include "gl/minmax.h"
37 #include "gl/xalloc.h"
40 #define _(msgid) gettext (msgid)
41 #define N_(msgid) (msgid)
46 psql_open_reader (struct psql_read_info
*info UNUSED
, struct dictionary
**dict UNUSED
)
48 msg (ME
, _("Support for reading postgres databases was not compiled into this installation of PSPP"));
59 /* Default width of string variables. */
60 #define PSQL_DEFAULT_WIDTH 8
62 /* These macros must be the same as in catalog/pg_types.h from the postgres source */
75 #define BPCHAROID 1042
76 #define VARCHAROID 1043
79 #define TIMESTAMPOID 1114
80 #define TIMESTAMPTZOID 1184
81 #define INTERVALOID 1186
82 #define TIMETZOID 1266
83 #define NUMERICOID 1700
85 static void psql_casereader_destroy (struct casereader
*reader UNUSED
, void *r_
);
87 static struct ccase
*psql_casereader_read (struct casereader
*, void *);
89 static const struct casereader_class psql_casereader_class
=
92 psql_casereader_destroy
,
103 bool integer_datetimes
;
105 double postgres_epoch
;
107 struct caseproto
*proto
;
108 struct dictionary
*dict
;
110 /* An array of ints, which maps psql column numbers into
112 struct variable
**vmap
;
115 struct string fetch_cmd
;
120 static struct ccase
*set_value (struct psql_reader
*r
);
126 data_to_native (const void *in_
, void *out_
, int len
)
129 const unsigned char *in
= in_
;
130 unsigned char *out
= out_
;
131 for (i
= 0 ; i
< len
; ++i
)
136 data_to_native (const void *in_
, void *out_
, int len
)
139 const unsigned char *in
= in_
;
140 unsigned char *out
= out_
;
141 for (i
= 0 ; i
< len
; ++i
)
142 out
[len
- i
- 1] = in
[i
];
147 #define GET_VALUE(IN, OUT) do { \
148 size_t sz = sizeof (OUT); \
149 data_to_native (*(IN), &(OUT), sz) ; \
156 dump (const unsigned char *x
, int l
)
160 for (i
= 0; i
< l
; ++i
)
162 printf ("%02x ", x
[i
]);
167 for (i
= 0; i
< l
; ++i
)
170 printf ("%c ", x
[i
]);
179 static struct variable
*
180 create_var (struct psql_reader
*r
, struct fmt_spec fmt
,
181 int width
, const char *suggested_name
, int col
)
184 = dict_create_var_with_unique_name (r
->dict
, suggested_name
, width
);
186 var_set_both_formats (var
, fmt
);
190 r
->vmap
= xrealloc (r
->vmap
, (col
+ 1) * sizeof (*r
->vmap
));
193 r
->vmapsize
= col
+ 1;
204 reload_cache (struct psql_reader
*r
)
209 r
->res
= PQexec (r
->conn
, ds_cstr (&r
->fetch_cmd
));
211 if (PQresultStatus (r
->res
) != PGRES_TUPLES_OK
|| PQntuples (r
->res
) < 1)
223 psql_open_reader (struct psql_read_info
*info
, struct dictionary
**dict
)
226 int n_fields
, n_tuples
;
227 PGresult
*qres
= NULL
;
228 casenumber n_cases
= CASENUMBER_MAX
;
229 const char *encoding
;
231 struct psql_reader
*r
= XZALLOC (struct psql_reader
);
233 r
->conn
= PQconnectdb (info
->conninfo
);
236 msg (ME
, _("Memory error whilst opening psql source"));
240 if (PQstatus (r
->conn
) != CONNECTION_OK
)
242 msg (ME
, _("Error opening psql source: %s."),
243 PQerrorMessage (r
->conn
));
250 const char *vers
= PQparameterStatus (r
->conn
, "server_version");
252 sscanf (vers
, "%d", &ver_num
);
257 _("Postgres server is version %s."
258 " Reading from versions earlier than 8.0 is not supported."),
266 const char *dt
= PQparameterStatus (r
->conn
, "integer_datetimes");
268 r
->integer_datetimes
= (0 == c_strcasecmp (dt
, "on"));
272 if (PQgetssl (r
->conn
) == NULL
)
275 if (! info
->allow_clear
)
277 msg (ME
, _("Connection is unencrypted, "
278 "but unencrypted connections have not been permitted."));
283 r
->postgres_epoch
= calendar_gregorian_to_offset (
284 2000, 1, 1, settings_get_fmt_settings (), NULL
);
287 const int enc
= PQclientEncoding (r
->conn
);
289 /* According to section 22.2 of the Postgresql manual
290 a value of zero (SQL_ASCII) indicates
291 "a declaration of ignorance about the encoding".
292 Accordingly, we use the default encoding
293 if we find this value.
295 encoding
= enc
? pg_encoding_to_char (enc
) : get_default_encoding ();
297 /* Create the dictionary and populate it */
298 *dict
= r
->dict
= dict_create (encoding
);
301 const int version
= PQserverVersion (r
->conn
);
303 Versions before 9.1 don't have the REPEATABLE READ isolation level.
304 However according to <a12321aabb@gmail.com> if the server is in the
305 "hot standby" mode then SERIALIZABLE won't work.
307 char *query
= xasprintf (
308 "BEGIN READ ONLY ISOLATION LEVEL %s; "
309 "DECLARE pspp BINARY CURSOR FOR %s",
310 (version
< 90100) ? "SERIALIZABLE" : "REPEATABLE READ",
312 qres
= PQexec (r
->conn
, query
);
315 if (PQresultStatus (qres
) != PGRES_COMMAND_OK
)
317 msg (ME
, _("Error from psql source: %s."),
318 PQresultErrorMessage (qres
));
325 /* Now use the count() function to find the total number of cases
326 that this query returns.
327 Doing this incurs some overhead. The server has to iterate every
328 case in order to find this number. However, it's performed on the
329 server side, and in all except the most huge databases the extra
330 overhead will be worth the effort.
331 On the other hand, most PSPP functions don't need to know this.
332 The GUI is the notable exception.
334 query
= xasprintf ("SELECT count (*) FROM (%s) stupid_sql_standard",
336 qres
= PQexec (r
->conn
, query
);
339 if (PQresultStatus (qres
) != PGRES_TUPLES_OK
)
341 msg (ME
, _("Error from psql source: %s."),
342 PQresultErrorMessage (qres
));
345 n_cases
= atol (PQgetvalue (qres
, 0, 0));
348 qres
= PQexec (r
->conn
, "FETCH FIRST FROM pspp");
349 if (PQresultStatus (qres
) != PGRES_TUPLES_OK
)
351 msg (ME
, _("Error from psql source: %s."),
352 PQresultErrorMessage (qres
));
356 n_tuples
= PQntuples (qres
);
357 n_fields
= PQnfields (qres
);
363 for (i
= 0 ; i
< n_fields
; ++i
)
365 struct variable
*var
;
366 struct fmt_spec fmt
= { .type
= FMT_F
, .w
= 8, .d
= 2 };
367 Oid type
= PQftype (qres
, i
);
371 /* If there are no data then make a finger in the air
372 guess at the contents */
374 length
= PQgetlength (qres
, 0, i
);
376 length
= PSQL_DEFAULT_WIDTH
;
390 fmt
.type
= FMT_DOLLAR
;
394 width
= length
> 0 ? length
: 1;
402 width
= (info
->str_width
== -1) ?
403 ROUND_UP (length
, PSQL_DEFAULT_WIDTH
) : info
->str_width
;
409 width
= length
> 0 ? length
: PSQL_DEFAULT_WIDTH
;
414 fmt
.type
= FMT_DTIME
;
434 fmt
.type
= FMT_DATETIME
;
446 msg (MW
, _("Unsupported OID %d. SYSMIS values will be inserted."), type
);
448 width
= length
> 0 ? length
: PSQL_DEFAULT_WIDTH
;
454 if (width
== 0 && fmt_is_string (fmt
.type
))
455 fmt
.w
= width
= PSQL_DEFAULT_WIDTH
;
458 var
= create_var (r
, fmt
, width
, PQfname (qres
, i
), i
);
459 if (type
== NUMERICOID
&& n_tuples
> 0)
461 const uint8_t *vptr
= (const uint8_t *) PQgetvalue (qres
, 0, i
);
463 int16_t n_digits
, weight
, dscale
;
466 GET_VALUE (&vptr
, n_digits
);
467 GET_VALUE (&vptr
, weight
);
468 GET_VALUE (&vptr
, sign
);
469 GET_VALUE (&vptr
, dscale
);
473 fmt
.w
= fmt_max_output_width (fmt
.type
) ;
474 fmt
.d
= MIN (dscale
, fmt_max_output_decimals (fmt
.type
, fmt
.w
));
475 var_set_both_formats (var
, fmt
);
478 /* Timezones need an extra variable */
484 ds_init_cstr (&name
, var_get_name (var
));
485 ds_put_cstr (&name
, "-zone");
490 create_var (r
, fmt
, 0, ds_cstr (&name
), -1);
499 ds_init_cstr (&name
, var_get_name (var
));
500 ds_put_cstr (&name
, "-months");
505 create_var (r
, fmt
, 0, ds_cstr (&name
), -1);
516 qres
= PQexec (r
->conn
, "MOVE BACKWARD 1 FROM pspp");
517 if (PQresultStatus (qres
) != PGRES_COMMAND_OK
)
524 r
->cache_size
= info
->bsize
!= -1 ? info
->bsize
: 4096;
526 ds_init_empty (&r
->fetch_cmd
);
527 ds_put_format (&r
->fetch_cmd
, "FETCH FORWARD %d FROM pspp", r
->cache_size
);
530 r
->proto
= caseproto_ref (dict_get_proto (*dict
));
532 return casereader_create_sequential
536 &psql_casereader_class
, r
);
541 psql_casereader_destroy (NULL
, r
);
547 psql_casereader_destroy (struct casereader
*reader UNUSED
, void *r_
)
549 struct psql_reader
*r
= r_
;
553 ds_destroy (&r
->fetch_cmd
);
555 if (r
->res
) PQclear (r
->res
);
557 caseproto_unref (r
->proto
);
564 static struct ccase
*
565 psql_casereader_read (struct casereader
*reader UNUSED
, void *r_
)
567 struct psql_reader
*r
= r_
;
569 if (NULL
== r
->res
|| r
->tuple
>= r
->cache_size
)
571 if (! reload_cache (r
))
575 return set_value (r
);
578 static struct ccase
*
579 set_value (struct psql_reader
*r
)
587 n_vars
= PQnfields (r
->res
);
589 if (r
->tuple
>= PQntuples (r
->res
))
592 c
= case_create (r
->proto
);
593 case_set_missing (c
);
596 for (i
= 0 ; i
< n_vars
; ++i
)
598 Oid type
= PQftype (r
->res
, i
);
599 const struct variable
*v
= r
->vmap
[i
];
600 union value
*val
= case_data_rw (c
, v
);
602 union value
*val1
= NULL
;
609 if (i
< r
->vmapsize
&& var_get_dict_index(v
) + 1 < dict_get_n_vars (r
->dict
))
611 const struct variable
*v1
= NULL
;
612 v1
= dict_get_var (r
->dict
, var_get_dict_index (v
) + 1);
614 val1
= case_data_rw (c
, v1
);
622 if (PQgetisnull (r
->res
, r
->tuple
, i
))
624 value_set_missing (val
, var_get_width (v
));
639 const uint8_t *vptr
= (const uint8_t *) PQgetvalue (r
->res
, r
->tuple
, i
);
640 int length
= PQgetlength (r
->res
, r
->tuple
, i
);
642 int var_width
= var_get_width (v
);
648 GET_VALUE (&vptr
, x
);
657 GET_VALUE (&vptr
, x
);
665 GET_VALUE (&vptr
, x
);
673 GET_VALUE (&vptr
, x
);
681 GET_VALUE (&vptr
, n
);
689 GET_VALUE (&vptr
, n
);
696 /* Postgres 8.3 uses 64 bits.
697 Earlier versions use 32 */
703 GET_VALUE (&vptr
, x
);
710 GET_VALUE (&vptr
, x
);
723 if (r
->integer_datetimes
)
730 GET_VALUE (&vptr
, things
);
731 GET_VALUE (&vptr
, us
);
732 GET_VALUE (&vptr
, days
);
733 GET_VALUE (&vptr
, months
);
735 val
->f
= us
/ 1000000.0;
736 val
->f
+= days
* 24 * 3600;
742 uint32_t days
, months
;
745 GET_VALUE (&vptr
, seconds
);
746 GET_VALUE (&vptr
, days
);
747 GET_VALUE (&vptr
, months
);
750 val
->f
+= days
* 24 * 3600;
761 GET_VALUE (&vptr
, x
);
763 val
->f
= (x
+ r
->postgres_epoch
) * 24 * 3600 ;
769 if (r
->integer_datetimes
)
772 GET_VALUE (&vptr
, x
);
773 val
->f
= x
/ 1000000.0;
778 GET_VALUE (&vptr
, x
);
787 if (r
->integer_datetimes
)
792 GET_VALUE (&vptr
, x
);
793 val
->f
= x
/ 1000000.0;
799 GET_VALUE (&vptr
, x
);
803 GET_VALUE (&vptr
, zone
);
804 val1
->f
= zone
/ 3600.0;
811 if (r
->integer_datetimes
)
815 GET_VALUE (&vptr
, x
);
819 val
->f
= (x
+ r
->postgres_epoch
* 24 * 3600);
825 GET_VALUE (&vptr
, x
);
827 val
->f
= (x
+ r
->postgres_epoch
* 24 * 3600);
835 memcpy (val
->s
, vptr
, MIN (length
, var_width
));
842 int16_t n_digits
, weight
, dscale
;
845 GET_VALUE (&vptr
, n_digits
);
846 GET_VALUE (&vptr
, weight
);
847 GET_VALUE (&vptr
, sign
);
848 GET_VALUE (&vptr
, dscale
);
855 fmt
.w
= fmt_max_output_width (fmt
.type
) ;
856 fmt
.d
= MIN (dscale
, fmt_max_output_decimals (fmt
.type
, fmt
.w
));
857 var_set_both_formats (v
, &fmt
);
861 for (i
= 0 ; i
< n_digits
; ++i
)
864 GET_VALUE (&vptr
, x
);
865 f
+= x
* pow (10000, weight
--);