fix
[libpgclient.git] / src / pgconn.c
bloba5951b69f78deacc9c90b3d2532beacc3bdefe28
1 #include "libpgcli/pgconn.h"
3 #define PG_STACKSIZE 1024 * 8
4 #define PG_DEFAULT_PORT 5432
6 static uint32_t pg_addr = 0;
7 static unsigned short pg_port = 0;
9 //******************************************************************************
10 // Init
11 //******************************************************************************
12 __attribute ((constructor))
13 static void _pg_init () {
14 struct in_addr addr;
15 inet_aton("127.0.0.1", &addr);
16 pg_addr = addr.s_addr;
17 pg_port = htons(PG_DEFAULT_PORT);
20 __attribute__ ((destructor))
21 static void _pg_done () {
24 static int _pg_atoport (const char *service) {
25 struct servent *servent = getservbyname(service, "tcp");
26 int port;
27 char *tail;
28 if (servent)
29 return servent->s_port;
30 port = strtol(service, &tail, 0);
31 if ('\0' == *tail || port < 1 || port > USHRT_MAX)
32 return -1;
33 return htons(port);
36 void pgconn_init (const char *pg_srv_addr, const char *pg_srv_service) {
37 struct in_addr addr;
38 if (pg_srv_service)
39 pg_port = _pg_atoport(pg_srv_service);
40 if (pg_srv_addr)
41 inet_aton(pg_srv_addr, &addr);
42 pg_addr = addr.s_addr;
45 //******************************************************************************
46 // Data
47 //******************************************************************************
48 // Get
49 //******************************************************************************
50 const strptr_t pg_get (pgconn_t *conn, int row, int col) {
51 strptr_t res = { .ptr = NULL, .len = 0 };
52 assert(CHECK_BOUNDS(conn, row, col));
53 res.ptr = (char*)PG_DATA(conn, row, col);
54 res.len = PG_FLDLEN(conn, row, col);
55 return res;
58 strptr_t pg_getstr (pgconn_t *conn, int row, int col) {
59 strptr_t s = pg_get(conn, row, col), res = { .ptr = NULL, .len = 0 };
60 if (s.len > 0) {
61 res.ptr = strndup(s.ptr, s.len);
62 res.len = s.len;
64 return res;
67 uint32_t pg_getx32 (pgconn_t *conn, int row, int col) {
68 assert(CHECK_BOUNDS(conn, row, col));
69 pg_bit32_t *res = (pg_bit32_t*)PG_DATA(conn, row, col);
70 return be32toh(res->bit);
73 pg_intv_t pg_getintv (pgconn_t *conn, int row, int col) {
74 pg_intv_t *i, res = { .time = 0, .day = 0, .month = 0 };
75 assert(CHECK_BOUNDS(conn, row, col));
76 i = (pg_intv_t*)PG_DATA(conn, row, col),
77 res.time = be64toh(i->time);
78 res.day = be32toh(i->day);
79 res.month = be32toh(i->month);
80 return res;
83 typedef union {
84 unsigned long l;
85 unsigned char a [4];
86 } pg_inet4_t;
87 int32_t pg_getinet4 (pgconn_t *conn, int row, int col) {
88 uint8_t *v = PG_DATA(conn, row, col);
89 if (2 == v[0] && 32 == v[1] && 4 == *(unsigned short*)(v+2)) {
90 pg_inet4_t i;
91 i.a[0] = v[7];
92 i.a[1] = v[6];
93 i.a[2] = v[5];
94 i.a[3] = v[4];
95 return be32toh(i.l);
97 return 0;
100 //******************************************************************************
102 //******************************************************************************
103 const int pg_error (pgconn_t *conn, pgerror_t *e) {
104 e->code = 0;
105 e->sqlstate = NULL;
106 if ((conn->intr_error & EPG_PORT)) {
107 e->code = EPG_PORT;
108 e->msg = "Illegal port";
109 return -1;
111 if ((conn->intr_error & EPG_ERRNO)) {
112 e->code = EPG_ERRNO;
113 e->msg = strerror(conn->intr_error & EPG_HOST);
114 return -1;
116 switch ((conn->intr_error & EPG_HOST)) {
117 case HOST_NOT_FOUND:
118 e->code = HOST_NOT_FOUND;
119 e->msg = "The specified host is unknown.";
120 return -1;
121 case NO_DATA:
122 e->code = NO_DATA;
123 e->msg = "The requested name is valid but does not have an IP address. Another type of request to the name server for this domain may return an answer.";
124 return -1;
125 case NO_RECOVERY:
126 e->code = NO_RECOVERY;
127 e->msg = "A nonrecoverable name server error occurred.";
128 return -1;
129 case TRY_AGAIN:
130 e->code = TRY_AGAIN;
131 e->msg = "A temporary error occurred on an authoritative name server. Try again later.";
132 return -1;
134 if (conn->error) {
135 e->sqlstate = conn->error->code;
136 e->msg = conn->error->text;
137 return -1;
139 e->sqlstate = ERRCODE_SUCCESSFUL_COMPLETION;
140 e->msg = "Success";
141 return 0;
144 static void _pg_startup (pgconn_t *conn, const char *conn_info) {
145 pgmsg_t *msg = pgmsg_create_startup_params(conn_info);
146 if (-1 == pgmsg_send(conn->fd, msg))
147 goto done;
148 free(msg);
149 msg = NULL;
150 while (!conn->ready && !conn->error && 0 == pgmsg_recv(conn->fd, &msg)) {
151 pgmsg_resp_t *resp = NULL;
152 switch (msg->body.type) {
153 case PG_READY:
154 lst_adde(conn->msgs, msg);
155 resp = pgmsg_parse(msg);
156 conn->ready = &resp->msg_ready;
157 msg = NULL;
158 break;
159 case PG_ERROR:
160 lst_adde(conn->msgs, msg);
161 resp = pgmsg_parse(msg);
162 conn->error = &resp->msg_error;
163 msg = NULL;
164 break;
165 case PG_PARAMSTATUS:
166 resp = pgmsg_parse(msg);
167 if (0 == strcmp(resp->msg_param_status.name, "integer_datetimes"))
168 conn->is_int_datetimes = 0 == strcmp(resp->msg_param_status.value, "on") ? 1 : 0;
169 else
170 if (0 == strcmp(resp->msg_param_status.name, "is_superuser"))
171 conn->is_superuser = 0 == strcmp(resp->msg_param_status.value, "on") ? 1 : 0;
172 else
173 if (0 == strcmp(resp->msg_param_status.name, "server_version")) {
174 char *ver = strdup(resp->msg_param_status.value),
175 *p = strchr(ver, '.');
176 if (p) {
177 *p++ = '\0';
178 conn->minor_ver = strtol(p, NULL, 0);
180 conn->major_ver = strtol(ver, NULL, 0);
181 free(ver);
182 } else
183 if (0 == strcmp(resp->msg_param_status.name, "client_encoding"))
184 conn->client_encoding = strdup(resp->msg_param_status.value);
185 else
186 if (0 == strcmp(resp->msg_param_status.name, "server_encoding"))
187 conn->server_encoding = strdup(resp->msg_param_status.value);
188 else
189 if (0 == strcmp(resp->msg_param_status.name, "session_authorization"))
190 conn->session_auth = strdup(resp->msg_param_status.value);
191 if (0 == strcmp(resp->msg_param_status.name, "DateStyle"))
192 conn->date_style = strdup(resp->msg_param_status.value);
193 free(resp);
194 free(msg);
195 msg = NULL;
196 break;
197 case PG_AUTHOK:
198 resp = pgmsg_parse(msg);
199 conn->authok = resp->msg_auth.success;
200 free(resp);
201 free(msg);
202 msg = NULL;
203 break;
204 default:
205 free(msg);
206 msg = NULL;
207 break;
210 done:
211 if (msg)
212 free(msg);
215 static void *_parse_param (void *data, const char *begin, const char *end, unsigned int *flags) {
216 struct sockaddr_in *in_addr = (struct sockaddr_in*)data;
217 const char *p = begin, *p1;
218 while (p < end && '=' != *p) ++p;
219 if (p == end)
220 return data;
221 p1 = p;
222 while (p1 > begin && isspace(*(p1 - 1))) --p1;
223 if (p == p1 - 1)
224 return data;
225 ++p;
226 while (p < end && isspace(*p)) ++p;
227 if (p == end)
228 return data;
229 if (0 == strncmp(begin, "host", (uintptr_t)p1 - (uintptr_t)begin)) {
230 char *addr = strndup(p, (uintptr_t)end - (uintptr_t)p);
231 if (!inet_aton(addr, &in_addr->sin_addr)) {
232 struct hostent *host = gethostbyname(addr);
233 if (host)
234 in_addr->sin_addr = *(struct in_addr*)host->h_addr_list;
235 else
236 *flags |= h_errno;
238 free(addr);
239 } else
240 if (0 == strncmp(begin, "port", (uintptr_t)p1 - (uintptr_t)begin)) {
241 char *port_s = strndup(p, (uintptr_t)end - (uintptr_t)p), *tail;
242 int port = strtol(port_s, &tail, 0);
243 if ('\0' == *tail && ERANGE != errno)
244 in_addr->sin_port = htons(port);
245 else
246 *flags |= EPG_PORT;
248 return data;
251 pgconn_t *pg_connect (const char *conn_info) {
252 pgconn_t *conn;
253 struct sockaddr_in in_addr = { .sin_family = AF_INET, .sin_port = pg_port, .sin_addr.s_addr = pg_addr };
254 int fd;
255 uint32_t flags = 0;
256 conn = calloc(1, sizeof(pgconn_t));
257 conn->msgs = lst_alloc(on_default_free_item);
258 conn->row_list = lst_alloc(on_default_free_item);
259 parse_conninfo((void*)&in_addr, conn_info, _parse_param, &flags);
260 if (0 != (conn->intr_error = flags))
261 return conn;
262 fd = socket(AF_INET, SOCK_STREAM, 0);
263 if (-1 == connect(fd, (struct sockaddr*)&in_addr, sizeof in_addr)) {
264 close(fd);
265 conn->intr_error = EPG_ERRNO | errno;
266 return conn;
268 conn->fd = fd;
269 _pg_startup(conn, conn_info);
270 return conn;
273 void _pg_close (pgconn_t *conn) {
274 if (conn->rows) {
275 free(conn->rows);
276 conn->rows = NULL;
278 if (conn->ready) {
279 free(conn->ready);
280 conn->ready = NULL;
282 if (conn->error) {
283 free(conn->error);
284 conn->error = NULL;
286 if (conn->rowdesc) {
287 free(conn->rowdesc);
288 conn->rowdesc = NULL;
290 if (conn->complete) {
291 free(conn->complete);
292 conn->complete = NULL;
294 conn->suspended = NULL;
295 lst_clear(conn->row_list);
296 lst_clear(conn->msgs);
297 conn->nflds = 0;
300 void pg_close (pgconn_t *conn) {
301 _pg_close(conn);
304 void pg_disconnect (pgconn_t *conn) {
305 pgmsg_t *msg = pgmsg_create(PG_TERM);
306 pgmsg_send(conn->fd, msg);
307 free(msg);
308 _pg_close(conn);
309 if (conn->date_style)
310 free(conn->date_style);
311 if (conn->client_encoding)
312 free(conn->client_encoding);
313 if (conn->server_encoding)
314 free(conn->server_encoding);
315 if (conn->session_auth)
316 free(conn->session_auth);
317 lst_free(conn->row_list);
318 lst_free(conn->msgs);
319 if (conn->fd > 0)
320 close(conn->fd);
321 free(conn);
324 typedef struct {
325 int i;
326 pgconn_t *conn;
327 } _on_row_t;
329 static int _on_set_row (list_item_t *li, void *userdata) {
330 _on_row_t *r = (_on_row_t*)userdata;
331 r->conn->rows[r->i++] = (pgmsg_datarow_t*)li->ptr;
332 return ENUM_CONTINUE;
335 static void _set_rows (pgconn_t *conn) {
336 _on_row_t r = { .i = 0, .conn = conn };
337 conn->rows = malloc(conn->row_list->len * sizeof(void*));
338 lst_enum(conn->row_list, _on_set_row, &r, 0);
341 static int _simple_exec (pgconn_t *conn) {
342 int rc = 0;
343 pgmsg_t *msg;
344 while (0 == pgmsg_recv(conn->fd, &msg)) {
345 pgmsg_resp_t *resp;
346 switch (msg->body.type) {
347 case PG_READY:
348 lst_adde(conn->msgs, msg);
349 resp = pgmsg_parse(msg);
350 conn->ready = &resp->msg_ready;
351 if (conn->row_list->len > 0)
352 _set_rows(conn);
353 goto done;
354 case PG_ERROR:
355 lst_adde(conn->msgs, msg);
356 resp = pgmsg_parse(msg);
357 conn->error = &resp->msg_error;
358 rc = -1;
359 goto done;
360 case PG_ROWDESC:
361 lst_adde(conn->msgs, msg);
362 resp = pgmsg_parse(msg);
363 conn->rowdesc = &resp->msg_rowdesc;
364 conn->nflds = conn->rowdesc->nflds;
365 break;
366 case PG_CMDCOMPLETE:
367 lst_adde(conn->msgs, msg);
368 resp = pgmsg_parse(msg);
369 conn->complete = &resp->msg_complete;
370 break;
371 case PG_DATAROW:
372 lst_adde(conn->msgs, msg);
373 resp = pgmsg_parse(msg);
374 lst_adde(conn->row_list, &resp->msg_datarow);
375 break;
376 case PG_NODATA:
377 free(msg);
378 conn->nflds = 0;
379 break;
380 default:
381 free(msg);
382 break;
385 done:
386 return rc;
389 static int _exec (pgconn_t *conn) {
390 int rc = 0;
391 pgmsg_t *msg;
392 _pg_close(conn);
393 while (0 == pgmsg_recv(conn->fd, &msg)) {
394 pgmsg_resp_t *resp;
395 switch (msg->body.type) {
396 case PG_READY:
397 lst_adde(conn->msgs, msg);
398 resp = pgmsg_parse(msg);
399 conn->ready = &resp->msg_ready;
400 if (conn->row_list->len > 0)
401 _set_rows(conn);
402 goto done;
403 case PG_PARSECOMPLETE:
404 free(msg);
405 break;
406 case PG_BINDCOMPLETE:
407 free(msg);
408 break;
409 case PG_ROWDESC:
410 lst_adde(conn->msgs, msg);
411 resp = pgmsg_parse(msg);
412 conn->rowdesc = &resp->msg_rowdesc;
413 conn->nflds = conn->rowdesc->nflds;
414 break;
415 case PG_PARAMDESC:
416 free(msg);
417 if (0 == pgmsg_recv(conn->fd, &msg)) {
418 switch (msg->body.type) {
419 case PG_ERROR:
420 lst_adde(conn->msgs, msg);
421 resp = pgmsg_parse(msg);
422 conn->error = &resp->msg_error;
423 rc = -1;
424 break;
425 case PG_ROWDESC:
426 lst_adde(conn->msgs, msg);
427 resp = pgmsg_parse(msg);
428 conn->rowdesc = &resp->msg_rowdesc;
429 conn->nflds = conn->rowdesc->nflds;
430 break;
431 case PG_NODATA:
432 free(msg);
433 conn->nflds = 0;
434 break;
435 default:
436 free(msg);
437 break;
440 // ins
441 case PG_ERROR:
442 lst_adde(conn->msgs, msg);
443 resp = pgmsg_parse(msg);
444 conn->error = &resp->msg_error;
445 rc = -1;
446 goto done;
447 case PG_CMDCOMPLETE:
448 lst_adde(conn->msgs, msg);
449 resp = pgmsg_parse(msg);
450 conn->complete = &resp->msg_complete;
451 break;
452 case PG_DATAROW:
453 lst_adde(conn->msgs, msg);
454 resp = pgmsg_parse(msg);
455 lst_adde(conn->row_list, &resp->msg_datarow);
456 break;
457 case PG_NODATA:
458 free(msg);
459 conn->nflds = 0;
460 break;
461 case PG_PORTALSUSPENDED:
462 lst_adde(conn->msgs, msg);
463 conn->suspended = msg;
464 break;
465 case PG_EMPTYQUERY:
466 lst_adde(conn->msgs, msg);
467 conn->emptyquery = msg;
468 break;
469 default:
470 free(msg);
471 break;
474 done:
475 return rc;
478 int pg_execsql (pgconn_t *conn, const char *sql, size_t sql_len) {
479 int rc;
480 pgmsg_t *msg;
481 _pg_close(conn);
482 if (0 == sql_len)
483 sql_len = strlen(sql);
484 msg = pgmsg_create_simple_query(sql, sql_len);
485 rc = pgmsg_send(conn->fd, msg);
486 free(msg);
487 if (0 == rc)
488 rc = _simple_exec(conn);
489 return rc;
492 static int _pg_sync (pgconn_t *conn) {
493 int rc;
494 pgmsg_t *msg = pgmsg_create(PG_SYNC);
495 rc = pgmsg_send(conn->fd, msg);
496 free(msg);
497 return rc;
500 int pg_prepareln (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, int fld_len, pgfld_t **flds) {
501 int rc;
502 pgmsg_t *msg;
503 _pg_close(conn);
504 msg = pgmsg_create_parse(name, name_len, sql, sql_len, fld_len, flds);
505 rc = pgmsg_send(conn->fd, msg);
506 free(msg);
507 if (0 == rc && name && 0 == (rc = _pg_sync(conn)))
508 rc = _exec(conn);
509 return rc;
512 DEFINE_ARRAY(pgfld_array_t, pgfld_ptr_t);
514 static int pg_preparevn (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, pgfld_t *fld, va_list ap) {
515 int rc;
516 pgfld_array_t *flds;
517 INIT_ARRAY(pgfld_array_t, flds, 8, 8, NULL);
518 if (fld) {
519 ARRAY_ADD(flds, fld);
520 while (NULL != (fld = va_arg(ap, pgfld_ptr_t))) {
521 ARRAY_ADD(flds, fld);
524 rc = pg_prepareln(conn, name, name_len, sql, sql_len, flds->len, flds->ptr);
525 free(flds);
526 return rc;
529 int pg_preparen (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, pgfld_t *fld, ...) {
530 int rc;
531 va_list ap;
532 va_start(ap, fld);
533 rc = pg_preparevn(conn, name, name_len, sql, sql_len, fld, ap);
534 va_end(ap);
535 return rc;
538 int pg_prepare (pgconn_t *conn, const char *sql, size_t sql_len, pgfld_t *fld, ...) {
539 int rc;
540 va_list ap;
541 va_start(ap, fld);
542 rc = pg_preparevn(conn, CONST_STR_NULL, sql, sql_len, fld, ap);
543 va_end(ap);
544 return rc;
547 static int _pg_bind (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len,
548 int fld_len, pgfld_t **flds, int res_fmt_len, int *res_fmt) {
549 int rc;
550 pgmsg_t *msg;
551 _pg_close(conn);
552 msg = pgmsg_create_bind(portal, portal_len, stmt, stmt_len, fld_len, flds, res_fmt_len, res_fmt);
553 rc = pgmsg_send(conn->fd, msg);
554 free(msg);
555 return rc;
558 static int _pg_describe (pgconn_t *conn, int8_t op, const char *portal, size_t portal_len) {
559 int rc;
560 pgmsg_t *msg;
561 msg = pgmsg_create_describe(op, portal, portal_len);
562 rc = pgmsg_send(conn->fd, msg);
563 free(msg);
564 return rc;
567 static int _pg_execute (pgconn_t *conn, const char *portal, size_t portal_len, int max_rows) {
568 int rc;
569 pgmsg_t *msg;
570 msg = pgmsg_create_execute(portal, portal_len, max_rows);
571 rc = pgmsg_send(conn->fd, msg);
572 free(msg);
573 if (0 == rc && 0 == (rc = _pg_sync(conn)))
574 rc = _exec(conn);
575 return rc;
578 int pg_execln (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len,
579 int fld_len, pgfld_t **flds, int res_fmt_len, int *res_fmt, int max_data) {
580 int rc;
581 if (0 == (rc = _pg_bind(conn, portal, portal_len, stmt, stmt_len, fld_len, flds, res_fmt_len, res_fmt)) &&
582 0 == (rc = _pg_describe(conn, PG_PREPARED_PORTAL, portal, portal_len)))
583 rc = _pg_execute(conn, portal, portal_len, max_data);
584 return rc;
587 int pg_nextn (pgconn_t *conn, const char *portal, size_t portal_len, int max_data) {
588 return _pg_execute(conn, portal, portal_len, max_data);
591 int pg_execvn (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len, int max_data, int out_fmt, pgfld_t *fld, va_list ap) {
592 int rc;
593 pgfld_array_t *flds;
594 INIT_ARRAY(pgfld_array_t, flds, 8, 8, NULL);
595 if (fld) {
596 ARRAY_ADD(flds, fld);
597 while (NULL != (fld = va_arg(ap, pgfld_ptr_t))) {
598 ARRAY_ADD(flds, fld);
601 rc = pg_execln(conn, portal, portal_len, stmt, stmt_len, flds->len, flds->ptr, 1, &out_fmt, max_data);
602 free(flds);
603 return rc;
606 int pg_execn (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len, int max_data, int out_fmt, pgfld_t *fld, ...) {
607 int rc;
608 va_list ap;
609 va_start(ap, fld);
610 rc = pg_execvn(conn, portal, portal_len, stmt, stmt_len, max_data, out_fmt, fld, ap);
611 va_end(ap);
612 return rc;
615 int pg_exec (pgconn_t *conn, int max_data, int out_fmt, pgfld_t *fld, ...) {
616 int rc;
617 va_list ap;
618 va_start(ap, fld);
619 rc = pg_execvn(conn, CONST_STR_NULL, CONST_STR_NULL, max_data, out_fmt, fld, ap);
620 va_end(ap);
621 return rc;
624 int pg_release (pgconn_t *conn, const char *name, size_t name_len) {
625 pgmsg_t *msg = pgmsg_create_close(PG_OPNAME, name, 0 == name_len ? strlen(name) : name_len);
626 int rc = pgmsg_send(conn->fd, msg);
627 free(msg);
628 return rc;