1 #include "libpgcli/pgconn.h"
3 #define PG_STACKSIZE 1024 * 8
4 #define PG_DEFAULT_PORT 5432
6 static uint32_t pg_addr
= 0;
7 static unsigned short pg_port
= 0;
9 //******************************************************************************
11 //******************************************************************************
12 __attribute ((constructor
))
13 static void _pg_init () {
15 inet_aton("127.0.0.1", &addr
);
16 pg_addr
= addr
.s_addr
;
17 pg_port
= htons(PG_DEFAULT_PORT
);
20 __attribute__ ((destructor
))
21 static void _pg_done () {
24 static int _pg_atoport (const char *service
) {
25 struct servent
*servent
= getservbyname(service
, "tcp");
29 return servent
->s_port
;
30 port
= strtol(service
, &tail
, 0);
31 if ('\0' == *tail
|| port
< 1 || port
> USHRT_MAX
)
36 void pgconn_init (const char *pg_srv_addr
, const char *pg_srv_service
) {
39 pg_port
= _pg_atoport(pg_srv_service
);
41 inet_aton(pg_srv_addr
, &addr
);
42 pg_addr
= addr
.s_addr
;
45 //******************************************************************************
47 //******************************************************************************
49 //******************************************************************************
50 const strptr_t
pg_get (pgconn_t
*conn
, int row
, int col
) {
51 strptr_t res
= { .ptr
= NULL
, .len
= 0 };
52 assert(CHECK_BOUNDS(conn
, row
, col
));
53 res
.ptr
= (char*)PG_DATA(conn
, row
, col
);
54 res
.len
= PG_FLDLEN(conn
, row
, col
);
58 strptr_t
pg_getstr (pgconn_t
*conn
, int row
, int col
) {
59 strptr_t s
= pg_get(conn
, row
, col
), res
= { .ptr
= NULL
, .len
= 0 };
61 res
.ptr
= strndup(s
.ptr
, s
.len
);
67 uint32_t pg_getx32 (pgconn_t
*conn
, int row
, int col
) {
68 assert(CHECK_BOUNDS(conn
, row
, col
));
69 pg_bit32_t
*res
= (pg_bit32_t
*)PG_DATA(conn
, row
, col
);
70 return be32toh(res
->bit
);
73 pg_intv_t
pg_getintv (pgconn_t
*conn
, int row
, int col
) {
74 pg_intv_t
*i
, res
= { .time
= 0, .day
= 0, .month
= 0 };
75 assert(CHECK_BOUNDS(conn
, row
, col
));
76 i
= (pg_intv_t
*)PG_DATA(conn
, row
, col
),
77 res
.time
= be64toh(i
->time
);
78 res
.day
= be32toh(i
->day
);
79 res
.month
= be32toh(i
->month
);
87 int32_t pg_getinet4 (pgconn_t
*conn
, int row
, int col
) {
88 uint8_t *v
= PG_DATA(conn
, row
, col
);
89 if (2 == v
[0] && 32 == v
[1] && 4 == *(unsigned short*)(v
+2)) {
100 //******************************************************************************
102 //******************************************************************************
103 const int pg_error (pgconn_t
*conn
, pgerror_t
*e
) {
106 if ((conn
->intr_error
& EPG_PORT
)) {
108 e
->msg
= "Illegal port";
111 if ((conn
->intr_error
& EPG_ERRNO
)) {
113 e
->msg
= strerror(conn
->intr_error
& EPG_HOST
);
116 switch ((conn
->intr_error
& EPG_HOST
)) {
118 e
->code
= HOST_NOT_FOUND
;
119 e
->msg
= "The specified host is unknown.";
123 e
->msg
= "The requested name is valid but does not have an IP address. Another type of request to the name server for this domain may return an answer.";
126 e
->code
= NO_RECOVERY
;
127 e
->msg
= "A nonrecoverable name server error occurred.";
131 e
->msg
= "A temporary error occurred on an authoritative name server. Try again later.";
135 e
->sqlstate
= conn
->error
->code
;
136 e
->msg
= conn
->error
->text
;
139 e
->sqlstate
= ERRCODE_SUCCESSFUL_COMPLETION
;
144 static void _pg_startup (pgconn_t
*conn
, const char *conn_info
) {
145 pgmsg_t
*msg
= pgmsg_create_startup_params(conn_info
);
146 if (-1 == pgmsg_send(conn
->fd
, msg
))
150 while (!conn
->ready
&& !conn
->error
&& 0 == pgmsg_recv(conn
->fd
, &msg
)) {
151 pgmsg_resp_t
*resp
= NULL
;
152 switch (msg
->body
.type
) {
154 lst_adde(conn
->msgs
, msg
);
155 resp
= pgmsg_parse(msg
);
156 conn
->ready
= &resp
->msg_ready
;
160 lst_adde(conn
->msgs
, msg
);
161 resp
= pgmsg_parse(msg
);
162 conn
->error
= &resp
->msg_error
;
166 resp
= pgmsg_parse(msg
);
167 if (0 == strcmp(resp
->msg_param_status
.name
, "integer_datetimes"))
168 conn
->is_int_datetimes
= 0 == strcmp(resp
->msg_param_status
.value
, "on") ? 1 : 0;
170 if (0 == strcmp(resp
->msg_param_status
.name
, "is_superuser"))
171 conn
->is_superuser
= 0 == strcmp(resp
->msg_param_status
.value
, "on") ? 1 : 0;
173 if (0 == strcmp(resp
->msg_param_status
.name
, "server_version")) {
174 char *ver
= strdup(resp
->msg_param_status
.value
),
175 *p
= strchr(ver
, '.');
178 conn
->minor_ver
= strtol(p
, NULL
, 0);
180 conn
->major_ver
= strtol(ver
, NULL
, 0);
183 if (0 == strcmp(resp
->msg_param_status
.name
, "client_encoding"))
184 conn
->client_encoding
= strdup(resp
->msg_param_status
.value
);
186 if (0 == strcmp(resp
->msg_param_status
.name
, "server_encoding"))
187 conn
->server_encoding
= strdup(resp
->msg_param_status
.value
);
189 if (0 == strcmp(resp
->msg_param_status
.name
, "session_authorization"))
190 conn
->session_auth
= strdup(resp
->msg_param_status
.value
);
191 if (0 == strcmp(resp
->msg_param_status
.name
, "DateStyle"))
192 conn
->date_style
= strdup(resp
->msg_param_status
.value
);
198 resp
= pgmsg_parse(msg
);
199 conn
->authok
= resp
->msg_auth
.success
;
215 static void *_parse_param (void *data
, const char *begin
, const char *end
, unsigned int *flags
) {
216 struct sockaddr_in
*in_addr
= (struct sockaddr_in
*)data
;
217 const char *p
= begin
, *p1
;
218 while (p
< end
&& '=' != *p
) ++p
;
222 while (p1
> begin
&& isspace(*(p1
- 1))) --p1
;
226 while (p
< end
&& isspace(*p
)) ++p
;
229 if (0 == strncmp(begin
, "host", (uintptr_t)p1
- (uintptr_t)begin
)) {
230 char *addr
= strndup(p
, (uintptr_t)end
- (uintptr_t)p
);
231 if (!inet_aton(addr
, &in_addr
->sin_addr
)) {
232 struct hostent
*host
= gethostbyname(addr
);
234 in_addr
->sin_addr
= *(struct in_addr
*)host
->h_addr_list
;
240 if (0 == strncmp(begin
, "port", (uintptr_t)p1
- (uintptr_t)begin
)) {
241 char *port_s
= strndup(p
, (uintptr_t)end
- (uintptr_t)p
), *tail
;
242 int port
= strtol(port_s
, &tail
, 0);
243 if ('\0' == *tail
&& ERANGE
!= errno
)
244 in_addr
->sin_port
= htons(port
);
251 pgconn_t
*pg_connect (const char *conn_info
) {
253 struct sockaddr_in in_addr
= { .sin_family
= AF_INET
, .sin_port
= pg_port
, .sin_addr
.s_addr
= pg_addr
};
256 conn
= calloc(1, sizeof(pgconn_t
));
257 conn
->msgs
= lst_alloc(on_default_free_item
);
258 conn
->row_list
= lst_alloc(on_default_free_item
);
259 parse_conninfo((void*)&in_addr
, conn_info
, _parse_param
, &flags
);
260 if (0 != (conn
->intr_error
= flags
))
262 fd
= socket(AF_INET
, SOCK_STREAM
, 0);
263 if (-1 == connect(fd
, (struct sockaddr
*)&in_addr
, sizeof in_addr
)) {
265 conn
->intr_error
= EPG_ERRNO
| errno
;
269 _pg_startup(conn
, conn_info
);
273 void _pg_close (pgconn_t
*conn
) {
288 conn
->rowdesc
= NULL
;
290 if (conn
->complete
) {
291 free(conn
->complete
);
292 conn
->complete
= NULL
;
294 conn
->suspended
= NULL
;
295 lst_clear(conn
->row_list
);
296 lst_clear(conn
->msgs
);
300 void pg_close (pgconn_t
*conn
) {
304 void pg_disconnect (pgconn_t
*conn
) {
305 pgmsg_t
*msg
= pgmsg_create(PG_TERM
);
306 pgmsg_send(conn
->fd
, msg
);
309 if (conn
->date_style
)
310 free(conn
->date_style
);
311 if (conn
->client_encoding
)
312 free(conn
->client_encoding
);
313 if (conn
->server_encoding
)
314 free(conn
->server_encoding
);
315 if (conn
->session_auth
)
316 free(conn
->session_auth
);
317 lst_free(conn
->row_list
);
318 lst_free(conn
->msgs
);
329 static int _on_set_row (list_item_t
*li
, void *userdata
) {
330 _on_row_t
*r
= (_on_row_t
*)userdata
;
331 r
->conn
->rows
[r
->i
++] = (pgmsg_datarow_t
*)li
->ptr
;
332 return ENUM_CONTINUE
;
335 static void _set_rows (pgconn_t
*conn
) {
336 _on_row_t r
= { .i
= 0, .conn
= conn
};
337 conn
->rows
= malloc(conn
->row_list
->len
* sizeof(void*));
338 lst_enum(conn
->row_list
, _on_set_row
, &r
, 0);
341 static int _simple_exec (pgconn_t
*conn
) {
344 while (0 == pgmsg_recv(conn
->fd
, &msg
)) {
346 switch (msg
->body
.type
) {
348 lst_adde(conn
->msgs
, msg
);
349 resp
= pgmsg_parse(msg
);
350 conn
->ready
= &resp
->msg_ready
;
351 if (conn
->row_list
->len
> 0)
355 lst_adde(conn
->msgs
, msg
);
356 resp
= pgmsg_parse(msg
);
357 conn
->error
= &resp
->msg_error
;
361 lst_adde(conn
->msgs
, msg
);
362 resp
= pgmsg_parse(msg
);
363 conn
->rowdesc
= &resp
->msg_rowdesc
;
364 conn
->nflds
= conn
->rowdesc
->nflds
;
367 lst_adde(conn
->msgs
, msg
);
368 resp
= pgmsg_parse(msg
);
369 conn
->complete
= &resp
->msg_complete
;
372 lst_adde(conn
->msgs
, msg
);
373 resp
= pgmsg_parse(msg
);
374 lst_adde(conn
->row_list
, &resp
->msg_datarow
);
389 static int _exec (pgconn_t
*conn
) {
393 while (0 == pgmsg_recv(conn
->fd
, &msg
)) {
395 switch (msg
->body
.type
) {
397 lst_adde(conn
->msgs
, msg
);
398 resp
= pgmsg_parse(msg
);
399 conn
->ready
= &resp
->msg_ready
;
400 if (conn
->row_list
->len
> 0)
403 case PG_PARSECOMPLETE
:
406 case PG_BINDCOMPLETE
:
410 lst_adde(conn
->msgs
, msg
);
411 resp
= pgmsg_parse(msg
);
412 conn
->rowdesc
= &resp
->msg_rowdesc
;
413 conn
->nflds
= conn
->rowdesc
->nflds
;
417 if (0 == pgmsg_recv(conn
->fd
, &msg
)) {
418 switch (msg
->body
.type
) {
420 lst_adde(conn
->msgs
, msg
);
421 resp
= pgmsg_parse(msg
);
422 conn
->error
= &resp
->msg_error
;
426 lst_adde(conn
->msgs
, msg
);
427 resp
= pgmsg_parse(msg
);
428 conn
->rowdesc
= &resp
->msg_rowdesc
;
429 conn
->nflds
= conn
->rowdesc
->nflds
;
442 lst_adde(conn
->msgs
, msg
);
443 resp
= pgmsg_parse(msg
);
444 conn
->error
= &resp
->msg_error
;
448 lst_adde(conn
->msgs
, msg
);
449 resp
= pgmsg_parse(msg
);
450 conn
->complete
= &resp
->msg_complete
;
453 lst_adde(conn
->msgs
, msg
);
454 resp
= pgmsg_parse(msg
);
455 lst_adde(conn
->row_list
, &resp
->msg_datarow
);
461 case PG_PORTALSUSPENDED
:
462 lst_adde(conn
->msgs
, msg
);
463 conn
->suspended
= msg
;
466 lst_adde(conn
->msgs
, msg
);
467 conn
->emptyquery
= msg
;
478 int pg_execsql (pgconn_t
*conn
, const char *sql
, size_t sql_len
) {
483 sql_len
= strlen(sql
);
484 msg
= pgmsg_create_simple_query(sql
, sql_len
);
485 rc
= pgmsg_send(conn
->fd
, msg
);
488 rc
= _simple_exec(conn
);
492 static int _pg_sync (pgconn_t
*conn
) {
494 pgmsg_t
*msg
= pgmsg_create(PG_SYNC
);
495 rc
= pgmsg_send(conn
->fd
, msg
);
500 int pg_prepareln (pgconn_t
*conn
, const char *name
, size_t name_len
, const char *sql
, size_t sql_len
, int fld_len
, pgfld_t
**flds
) {
504 msg
= pgmsg_create_parse(name
, name_len
, sql
, sql_len
, fld_len
, flds
);
505 rc
= pgmsg_send(conn
->fd
, msg
);
507 if (0 == rc
&& name
&& 0 == (rc
= _pg_sync(conn
)))
512 DEFINE_ARRAY(pgfld_array_t
, pgfld_ptr_t
);
514 static int pg_preparevn (pgconn_t
*conn
, const char *name
, size_t name_len
, const char *sql
, size_t sql_len
, pgfld_t
*fld
, va_list ap
) {
517 INIT_ARRAY(pgfld_array_t
, flds
, 8, 8, NULL
);
519 ARRAY_ADD(flds
, fld
);
520 while (NULL
!= (fld
= va_arg(ap
, pgfld_ptr_t
))) {
521 ARRAY_ADD(flds
, fld
);
524 rc
= pg_prepareln(conn
, name
, name_len
, sql
, sql_len
, flds
->len
, flds
->ptr
);
529 int pg_preparen (pgconn_t
*conn
, const char *name
, size_t name_len
, const char *sql
, size_t sql_len
, pgfld_t
*fld
, ...) {
533 rc
= pg_preparevn(conn
, name
, name_len
, sql
, sql_len
, fld
, ap
);
538 int pg_prepare (pgconn_t
*conn
, const char *sql
, size_t sql_len
, pgfld_t
*fld
, ...) {
542 rc
= pg_preparevn(conn
, CONST_STR_NULL
, sql
, sql_len
, fld
, ap
);
547 static int _pg_bind (pgconn_t
*conn
, const char *portal
, size_t portal_len
, const char *stmt
, size_t stmt_len
,
548 int fld_len
, pgfld_t
**flds
, int res_fmt_len
, int *res_fmt
) {
552 msg
= pgmsg_create_bind(portal
, portal_len
, stmt
, stmt_len
, fld_len
, flds
, res_fmt_len
, res_fmt
);
553 rc
= pgmsg_send(conn
->fd
, msg
);
558 static int _pg_describe (pgconn_t
*conn
, int8_t op
, const char *portal
, size_t portal_len
) {
561 msg
= pgmsg_create_describe(op
, portal
, portal_len
);
562 rc
= pgmsg_send(conn
->fd
, msg
);
567 static int _pg_execute (pgconn_t
*conn
, const char *portal
, size_t portal_len
, int max_rows
) {
570 msg
= pgmsg_create_execute(portal
, portal_len
, max_rows
);
571 rc
= pgmsg_send(conn
->fd
, msg
);
573 if (0 == rc
&& 0 == (rc
= _pg_sync(conn
)))
578 int pg_execln (pgconn_t
*conn
, const char *portal
, size_t portal_len
, const char *stmt
, size_t stmt_len
,
579 int fld_len
, pgfld_t
**flds
, int res_fmt_len
, int *res_fmt
, int max_data
) {
581 if (0 == (rc
= _pg_bind(conn
, portal
, portal_len
, stmt
, stmt_len
, fld_len
, flds
, res_fmt_len
, res_fmt
)) &&
582 0 == (rc
= _pg_describe(conn
, PG_PREPARED_PORTAL
, portal
, portal_len
)))
583 rc
= _pg_execute(conn
, portal
, portal_len
, max_data
);
587 int pg_nextn (pgconn_t
*conn
, const char *portal
, size_t portal_len
, int max_data
) {
588 return _pg_execute(conn
, portal
, portal_len
, max_data
);
591 int pg_execvn (pgconn_t
*conn
, const char *portal
, size_t portal_len
, const char *stmt
, size_t stmt_len
, int max_data
, int out_fmt
, pgfld_t
*fld
, va_list ap
) {
594 INIT_ARRAY(pgfld_array_t
, flds
, 8, 8, NULL
);
596 ARRAY_ADD(flds
, fld
);
597 while (NULL
!= (fld
= va_arg(ap
, pgfld_ptr_t
))) {
598 ARRAY_ADD(flds
, fld
);
601 rc
= pg_execln(conn
, portal
, portal_len
, stmt
, stmt_len
, flds
->len
, flds
->ptr
, 1, &out_fmt
, max_data
);
606 int pg_execn (pgconn_t
*conn
, const char *portal
, size_t portal_len
, const char *stmt
, size_t stmt_len
, int max_data
, int out_fmt
, pgfld_t
*fld
, ...) {
610 rc
= pg_execvn(conn
, portal
, portal_len
, stmt
, stmt_len
, max_data
, out_fmt
, fld
, ap
);
615 int pg_exec (pgconn_t
*conn
, int max_data
, int out_fmt
, pgfld_t
*fld
, ...) {
619 rc
= pg_execvn(conn
, CONST_STR_NULL
, CONST_STR_NULL
, max_data
, out_fmt
, fld
, ap
);
624 int pg_release (pgconn_t
*conn
, const char *name
, size_t name_len
) {
625 pgmsg_t
*msg
= pgmsg_create_close(PG_OPNAME
, name
, 0 == name_len
? strlen(name
) : name_len
);
626 int rc
= pgmsg_send(conn
->fd
, msg
);