fix
[libpgclient.git] / src / pgconn.c
blob5973eb79bb785941b9ac3ea8cdd7052d97814e81
1 /*Copyright (c) Brian B.
3 This library is free software; you can redistribute it and/or
4 modify it under the terms of the GNU Lesser General Public
5 License as published by the Free Software Foundation; either
6 version 3 of the License, or (at your option) any later version.
7 See the file LICENSE included with this distribution for more
8 information.
9 */
10 #include "hmac.h"
11 #include "libpgcli/pgconn.h"
13 #define PG_STACKSIZE 1024 * 8
14 #define PG_DEFAULT_PORT 5432
16 static uint32_t pg_addr = 0;
17 static unsigned short pg_port = 0;
19 //******************************************************************************
20 // Init
21 //******************************************************************************
22 #ifdef __GNUC__
23 __attribute ((constructor))
24 static void pg_init () {
25 #else
26 void pg_init () {
27 #endif
28 struct in_addr addr;
29 inet_aton("127.0.0.1", &addr);
30 pg_addr = addr.s_addr;
31 pg_port = htons(PG_DEFAULT_PORT);
34 #ifdef __GNUC__
35 __attribute__ ((destructor))
36 static void _pg_done () {
38 #endif
40 static int _pg_atoport (const char *service) {
41 struct servent *servent = getservbyname(service, "tcp");
42 int port;
43 char *tail;
44 if (servent)
45 return servent->s_port;
46 port = strtol(service, &tail, 0);
47 if ('\0' == *tail || port < 1 || port > USHRT_MAX)
48 return -1;
49 return htons(port);
52 void pgconn_init (const char *pg_srv_addr, const char *pg_srv_service) {
53 struct in_addr addr;
54 if (pg_srv_service)
55 pg_port = _pg_atoport(pg_srv_service);
56 if (pg_srv_addr)
57 inet_aton(pg_srv_addr, &addr);
58 pg_addr = addr.s_addr;
61 //******************************************************************************
62 // Data
63 //******************************************************************************
64 // Get
65 //******************************************************************************
66 #ifdef __GNUC__
67 const strptr_t pg_get (pgconn_t *conn, int row, int col) {
68 #else
69 strptr_t pg_get (pgconn_t *conn, int row, int col) {
70 #endif
71 strptr_t res = { .ptr = NULL, .len = 0 };
72 assert(CHECK_BOUNDS(conn, row, col));
73 res.ptr = (char*)PG_DATA(conn, row, col);
74 res.len = PG_FLDLEN(conn, row, col);
75 return res;
78 cstr_t *pg_getstr (pgconn_t *conn, int row, int col) {
79 strptr_t s = pg_get(conn, row, col);
80 cstr_t *res = NULL;
81 if (s.len > 0)
82 res = mkcstr(s.ptr, s.len);
83 return res;
86 uint32_t pg_getx32 (pgconn_t *conn, int row, int col) {
87 assert(CHECK_BOUNDS(conn, row, col));
88 pg_bit32_t *res = (pg_bit32_t*)PG_DATA(conn, row, col);
89 return be32toh(res->bit);
92 pg_intv_t pg_getintv (pgconn_t *conn, int row, int col) {
93 pg_intv_t *i, res = { .time = 0, .day = 0, .month = 0 };
94 assert(CHECK_BOUNDS(conn, row, col));
95 i = (pg_intv_t*)PG_DATA(conn, row, col),
96 res.time = be64toh(i->time);
97 res.day = be32toh(i->day);
98 res.month = be32toh(i->month);
99 return res;
102 typedef union {
103 unsigned long l;
104 unsigned char a [4];
105 } pg_inet4_t;
106 int32_t pg_getinet4 (pgconn_t *conn, int row, int col) {
107 uint8_t *v = PG_DATA(conn, row, col);
108 if (2 == v[0] && 32 == v[1] && 4 == *(unsigned short*)(v+2)) {
109 pg_inet4_t i;
110 i.a[0] = v[7];
111 i.a[1] = v[6];
112 i.a[2] = v[5];
113 i.a[3] = v[4];
114 return be32toh(i.l);
116 return 0;
119 //******************************************************************************
121 //******************************************************************************
122 #ifdef __GNUC__
123 const int pg_error (pgconn_t *conn, pgerror_t *e) {
124 #else
125 int pg_error (pgconn_t *conn, pgerror_t *e) {
126 #endif
127 e->code = NULL;
128 e->text = NULL;
129 e->msg = NULL;
130 if ((conn->intr_error & EPG_PORT)) {
131 e->code = STR(EPG_PORT);
132 e->text = "ERROR";
133 e->msg = "Illegal port";
134 return -1;
136 if ((conn->intr_error & EPG_ERRNO)) {
137 e->code = STR(EPG_ERRNO);
138 e->text = "ERROR";
139 e->msg = strerror(conn->intr_error & EPG_HOST);
140 return -1;
142 if ((conn->intr_error & EPG_PROTO)) {
143 e->code = STR(EPG_PROTO);
144 e->text = "ERROR";
145 e->msg = "Protocol error";
146 return -1;
148 switch ((conn->intr_error & EPG_HOST)) {
149 case HOST_NOT_FOUND:
150 e->code = STR(HOST_NOT_FOUND);
151 e->text = "ERROR";
152 e->msg = "The specified host is unknown.";
153 return -1;
154 case NO_DATA:
155 e->code = STR(NO_DATA);
156 e->text = "ERROR";
157 e->msg = "The requested name is valid but does not have an IP address. Another type of request to the name server for this domain may return an answer.";
158 return -1;
159 case NO_RECOVERY:
160 e->code = STR(NO_RECOVERY);
161 e->text = "ERROR";
162 e->msg = "A nonrecoverable name server error occurred.";
163 return -1;
164 case TRY_AGAIN:
165 e->code = STR(TRY_AGAIN);
166 e->text = "ERROR";
167 e->msg = "A temporary error occurred on an authoritative name server. Try again later.";
168 return -1;
170 if (conn->error) {
171 e->code = conn->error->code;
172 e->text = conn->error->text;
173 e->msg = conn->error->message;
174 e->detail = conn->error->detail;
175 return -1;
177 e->code = STR(ERRCODE_SUCCESSFUL_COMPLETION);
178 e->text = "SUCCESS";
179 e->msg = "Success";
180 return 0;
183 static void _pg_startup (pgconn_t *conn, const char *conn_info, conninfo_t *cinfo) {
184 pgmsg_t *msg = pgmsg_create_startup_params(conn_info);
185 if (-1 == pgmsg_send(conn->fd, msg))
186 goto done;
187 free(msg);
188 msg = NULL;
189 while (!conn->ready && !conn->error && 0 == pgmsg_recv(conn->fd, &msg)) {
190 pgmsg_resp_t *resp = NULL;
191 switch (msg->body.type) {
192 case PG_READY:
193 lst_adde(conn->msgs, msg);
194 resp = pgmsg_parse(msg);
195 conn->ready = &resp->msg_ready;
196 msg = NULL;
197 break;
198 case PG_ERROR:
199 lst_adde(conn->msgs, msg);
200 resp = pgmsg_parse(msg);
201 conn->error = &resp->msg_error;
202 msg = NULL;
203 break;
204 case PG_PARAMSTATUS:
205 resp = pgmsg_parse(msg);
206 if (0 == strcmp(resp->msg_param_status.name, "integer_datetimes"))
207 conn->is_int_datetimes = 0 == strcmp(resp->msg_param_status.value, "on") ? 1 : 0;
208 else
209 if (0 == strcmp(resp->msg_param_status.name, "is_superuser"))
210 conn->is_superuser = 0 == strcmp(resp->msg_param_status.value, "on") ? 1 : 0;
211 else
212 if (0 == strcmp(resp->msg_param_status.name, "server_version")) {
213 char *ver = strdup(resp->msg_param_status.value),
214 *p = strchr(ver, '.');
215 if (p) {
216 *p++ = '\0';
217 conn->minor_ver = strtol(p, NULL, 0);
219 conn->major_ver = strtol(ver, NULL, 0);
220 free(ver);
221 } else
222 if (0 == strcmp(resp->msg_param_status.name, "client_encoding"))
223 conn->client_encoding = strdup(resp->msg_param_status.value);
224 else
225 if (0 == strcmp(resp->msg_param_status.name, "server_encoding"))
226 conn->server_encoding = strdup(resp->msg_param_status.value);
227 else
228 if (0 == strcmp(resp->msg_param_status.name, "session_authorization"))
229 conn->session_auth = strdup(resp->msg_param_status.value);
230 else
231 if (0 == strcmp(resp->msg_param_status.name, "DateStyle"))
232 conn->date_style = strdup(resp->msg_param_status.value);
233 else
234 if (0 == strcmp(resp->msg_param_status.name, "TimeZone"))
235 conn->timezone = strdup(resp->msg_param_status.value);
236 if (resp) free(resp);
237 resp = NULL;
238 free(msg);
239 msg = NULL;
240 break;
241 case PG_AUTHOK:
242 resp = pgmsg_parse(msg);
243 if (!resp) continue;
244 conn->authok = resp->msg_auth.success;
245 switch (conn->authok) {
246 case PG_REQMD5:
247 case PG_REQPASS:
248 memcpy(cinfo->salt, resp->msg_auth.kind.md5_auth, sizeof(uint8_t) * 4);
249 free(resp);
250 free(msg);
251 msg = pgmsg_create_pass(conn->authok, cinfo->salt, sizeof(uint8_t) * 4, cinfo->user, cinfo->pass);
252 pgmsg_send(conn->fd, msg);
253 free(msg);
254 break;
255 case PG_REQSASL:
256 free(msg);
257 msg = pgmsg_create_sasl_init(cinfo);
258 pgmsg_send(conn->fd, msg);
259 break;
260 case PG_SASLCON:
261 free(msg);
262 msg = pgmsg_create_sasl_fin(resp, cinfo);
263 pgmsg_send(conn->fd, msg);
264 break;
265 case PG_OK:
266 case PG_SASLCOMP:
267 break;
268 default:
269 conn->intr_error = EPG_PROTO;
270 goto done;
272 free(msg);
273 free(resp);
274 msg = NULL;
275 break;
276 default:
277 free(msg);
278 msg = NULL;
279 break;
282 done:
283 if (msg)
284 free(msg);
287 static void *_parse_param (void *data, const char *begin, const char *end, unsigned int *flags) {
288 conninfo_t *cinfo = (conninfo_t*)data;
289 const char *p = begin, *p1;
290 while (p < end && '=' != *p) ++p;
291 if (p == end)
292 return data;
293 p1 = p;
294 while (p1 > begin && isspace(*(p1 - 1))) --p1;
295 if (p == p1 - 1)
296 return data;
297 ++p;
298 while (p < end && isspace(*p)) ++p;
299 if (p == end)
300 return data;
301 if (0 == strncmp(begin, "host", (uintptr_t)p1 - (uintptr_t)begin)) {
302 char *addr = strndup(p, (uintptr_t)end - (uintptr_t)p);
303 struct in_addr in_addr;
304 memset(&in_addr, 0, sizeof in_addr);
305 if (0 == atoaddr(addr, (struct in_addr*)&in_addr))
306 cinfo->in_addr.sin_addr = in_addr;
307 free(addr);
308 } else
309 if (0 == strncmp(begin, "port", (uintptr_t)p1 - (uintptr_t)begin)) {
310 char *port_s = strndup(p, (uintptr_t)end - (uintptr_t)p), *tail;
311 int port = strtol(port_s, &tail, 0);
312 if ('\0' == *tail && ERANGE != errno)
313 cinfo->in_addr.sin_port = htons(port);
314 else
315 *flags |= EPG_PORT;
316 free(port_s);
317 } else
318 if (0 == strncmp(begin, "user", (uintptr_t)p1 - (uintptr_t)begin)) {
319 if (cinfo->user)
320 free(cinfo->user);
321 cinfo->user = strndup(p, (uintptr_t)end - (uintptr_t)p);
322 } else
323 if (0 == strncmp(begin, "password", (uintptr_t)p1 - (uintptr_t)begin)) {
324 if (cinfo->pass)
325 free(cinfo->pass);
326 cinfo->pass = strndup(p, (uintptr_t)end - (uintptr_t)p);
328 return data;
331 static str_t *_parse_url (pgconn_t *conn, const char *url) {
332 char *str = strdup(url);
333 str_t *conninfo = stralloc(64, 64);
334 char *s = strstr(str, "://"), *e, *p,
335 *user = NULL,
336 *password = NULL,
337 *host = NULL,
338 *port = NULL,
339 *dbname = NULL;
340 if (!s) {
341 free(str);
342 return conninfo;
344 s += 3;
345 if (!*s) {
346 free(str);
347 return conninfo;
349 if ((e = strchr(s, '/')))
350 *e = '\0';
351 if ((p = strchr(s, '@'))) {
352 char *q;
353 *p = '\0';
354 user = s;
355 if ((q = strchr(s, ':'))) {
356 *q = '\0';
357 password = ++q;
359 s = p + 1;
361 host = s;
362 if ((p = strchr(s, ':'))) {
363 *p = '\0';
364 port = p + 1;
366 if (host) {
367 strnadd(&conninfo, CONST_STR_LEN("host="));
368 strnadd(&conninfo, host, strlen(host));
370 if (port) {
371 if (conninfo->len > 0)
372 strnadd(&conninfo, CONST_STR_LEN(" "));
373 strnadd(&conninfo, CONST_STR_LEN("port="));
374 strnadd(&conninfo, port, strlen(port));
376 if (user) {
377 if (conninfo->len > 0)
378 strnadd(&conninfo, CONST_STR_LEN(" "));
379 strnadd(&conninfo, CONST_STR_LEN("user="));
380 strnadd(&conninfo, user, strlen(user));
382 if (password) {
383 if (conninfo->len > 0)
384 strnadd(&conninfo, CONST_STR_LEN(" "));
385 strnadd(&conninfo, CONST_STR_LEN("password="));
386 strnadd(&conninfo, password, strlen(password));
388 if (!e || !*(++e)) {
389 free(str);
390 return conninfo;
392 s = e;
393 if ((e = strchr(s, '?')))
394 *e = '\0';
395 dbname = s;
396 if (conn->dbname) free(conn->dbname);
397 conn->dbname = strdup(dbname);
398 if (conninfo->len > 0) {
399 if (conninfo->len > 0)
400 strnadd(&conninfo, CONST_STR_LEN(" "));
401 strnadd(&conninfo, CONST_STR_LEN("dbname="));
402 strnadd(&conninfo, dbname, strlen(dbname));
404 if (!e || !*(++e)) {
405 free(str);
406 return conninfo;
408 s = e;
409 str_t *params = strsplit(s, strlen(s), '&');
410 strptr_t entry = { .ptr = NULL, .len = 0 };
411 while (-1 != strnext(params, &entry)) {
412 if ((p = strnchr(entry.ptr, '=', entry.len))) {
413 if (conninfo->len > 0)
414 strnadd(&conninfo, CONST_STR_LEN(" "));
415 strnadd(&conninfo, entry.ptr, entry.len);
418 free(params);
419 free(str);
420 return conninfo;
423 pgconn_t *pg_connect (const char *url) {
424 pgconn_t *conn;
425 conninfo_t cinfo = { .in_addr = {
426 .sin_family = AF_INET,
427 .sin_port = pg_port,
428 .sin_addr.s_addr = pg_addr },
429 .user = NULL, .pass = NULL,
430 .srv_scram_msg = NULL, .fmsg_bare = NULL,
431 .fmsg_srv = NULL, .fmsg_wproof = NULL,
432 .nonce = NULL };
433 int fd;
434 uint32_t flags = 0;
435 conn = calloc(1, sizeof(pgconn_t));
436 conn->msgs = lst_alloc(on_default_free_item);
437 conn->row_list = lst_alloc(on_default_free_item);
438 str_t *conn_info = _parse_url(conn, url);
439 parse_conninfo((void*)&cinfo, conn_info->ptr, _parse_param, &flags);
440 if (0 != (conn->intr_error = flags)) {
441 free(conn_info);
442 return conn;
444 fd = socket(AF_INET, SOCK_STREAM, 0);
445 if (-1 == connect(fd, (struct sockaddr*)&cinfo.in_addr, sizeof cinfo.in_addr)) {
446 free(conn_info);
447 close(fd);
448 conn->intr_error = EPG_ERRNO | errno;
449 return conn;
451 conn->fd = fd;
452 _pg_startup(conn, conn_info->ptr, &cinfo);
453 if (cinfo.user)
454 free(cinfo.user);
455 if (cinfo.pass)
456 free(cinfo.pass);
457 if (cinfo.srv_scram_msg)
458 free(cinfo.srv_scram_msg);
459 if (cinfo.fmsg_bare)
460 free(cinfo.fmsg_bare);
461 if (cinfo.fmsg_srv)
462 free(cinfo.fmsg_srv);
463 if (cinfo.fmsg_wproof)
464 free(cinfo.fmsg_wproof);
465 if (cinfo.nonce)
466 free(cinfo.nonce);
467 free(conn_info);
468 return conn;
471 static void _pg_close (pgconn_t *conn) {
472 if (conn->rows) {
473 free(conn->rows);
474 conn->rows = NULL;
476 if (conn->ready) {
477 free(conn->ready);
478 conn->ready = NULL;
480 if (conn->error) {
481 free(conn->error);
482 conn->error = NULL;
484 if (conn->rowdesc) {
485 free(conn->rowdesc);
486 conn->rowdesc = NULL;
488 if (conn->complete) {
489 free(conn->complete);
490 conn->complete = NULL;
492 conn->suspended = NULL;
493 lst_clear(conn->row_list);
494 lst_clear(conn->msgs);
495 conn->nflds = 0;
498 void pg_close (pgconn_t *conn) {
499 _pg_close(conn);
502 void pg_disconnect (pgconn_t *conn) {
503 pgmsg_t *msg = pgmsg_create(PG_TERM);
504 pgmsg_send(conn->fd, msg);
505 free(msg);
506 _pg_close(conn);
507 if (conn->date_style)
508 free(conn->date_style);
509 if (conn->client_encoding)
510 free(conn->client_encoding);
511 if (conn->server_encoding)
512 free(conn->server_encoding);
513 if (conn->session_auth)
514 free(conn->session_auth);
515 if (conn->timezone)
516 free(conn->timezone);
517 if (conn->dbname)
518 free(conn->dbname);
519 if (conn->nonce)
520 free(conn->nonce);
521 lst_free(conn->row_list);
522 lst_free(conn->msgs);
523 if (conn->fd > 0)
524 close(conn->fd);
525 free(conn);
528 typedef struct {
529 int i;
530 pgconn_t *conn;
531 } _on_row_t;
533 static int _on_set_row (list_item_t *li, void *userdata) {
534 _on_row_t *r = (_on_row_t*)userdata;
535 r->conn->rows[r->i++] = (pgmsg_datarow_t*)li->ptr;
536 return ENUM_CONTINUE;
539 static void _set_rows (pgconn_t *conn) {
540 _on_row_t r = { .i = 0, .conn = conn };
541 conn->rows = malloc(conn->row_list->len * sizeof(void*));
542 lst_enum(conn->row_list, _on_set_row, &r, 0);
545 static void _wait_ready (pgconn_t *conn) {
546 pgmsg_t *msg;
547 int is_ready = 0;
548 while (!is_ready && 0 == pgmsg_recv(conn->fd, &msg)) {
549 is_ready = PG_READY == msg->body.type;
550 free(msg);
554 static int _simple_exec (pgconn_t *conn) {
555 int rc = 0;
556 pgmsg_t *msg;
557 while (0 == pgmsg_recv(conn->fd, &msg)) {
558 pgmsg_resp_t *resp;
559 switch (msg->body.type) {
560 case PG_READY:
561 lst_adde(conn->msgs, msg);
562 resp = pgmsg_parse(msg);
563 conn->ready = &resp->msg_ready;
564 if (conn->row_list->len > 0)
565 _set_rows(conn);
566 goto done;
567 case PG_ERROR:
568 lst_adde(conn->msgs, msg);
569 resp = pgmsg_parse(msg);
570 conn->error = &resp->msg_error;
571 rc = -1;
572 _wait_ready(conn);
573 goto done;
574 case PG_ROWDESC:
575 lst_adde(conn->msgs, msg);
576 resp = pgmsg_parse(msg);
577 conn->rowdesc = &resp->msg_rowdesc;
578 conn->nflds = conn->rowdesc->nflds;
579 break;
580 case PG_CMDCOMPLETE:
581 lst_adde(conn->msgs, msg);
582 resp = pgmsg_parse(msg);
583 conn->complete = &resp->msg_complete;
584 break;
585 case PG_DATAROW:
586 lst_adde(conn->msgs, msg);
587 resp = pgmsg_parse(msg);
588 lst_adde(conn->row_list, &resp->msg_datarow);
589 break;
590 case PG_NODATA:
591 free(msg);
592 conn->nflds = 0;
593 break;
594 case PG_COPYIN:
595 lst_adde(conn->msgs, msg);
596 resp = pgmsg_parse(msg);
597 conn->cols = resp->msg_copyin.cols;
598 conn->fmt = resp->msg_copyin.fmt;
599 free(resp);
600 goto done;
601 default:
602 free(msg);
603 break;
606 done:
607 return rc;
610 static int _exec (pgconn_t *conn) {
611 int rc = 0;
612 pgmsg_t *msg;
613 _pg_close(conn);
614 while (0 == pgmsg_recv(conn->fd, &msg)) {
615 pgmsg_resp_t *resp;
616 switch (msg->body.type) {
617 case PG_READY:
618 lst_adde(conn->msgs, msg);
619 resp = pgmsg_parse(msg);
620 conn->ready = &resp->msg_ready;
621 if (conn->row_list->len > 0)
622 _set_rows(conn);
623 goto done;
624 case PG_PARSECOMPLETE:
625 free(msg);
626 break;
627 case PG_BINDCOMPLETE:
628 free(msg);
629 break;
630 case PG_ROWDESC:
631 lst_adde(conn->msgs, msg);
632 resp = pgmsg_parse(msg);
633 conn->rowdesc = &resp->msg_rowdesc;
634 conn->nflds = conn->rowdesc->nflds;
635 break;
636 case PG_PARAMDESC:
637 free(msg);
638 if (0 == pgmsg_recv(conn->fd, &msg)) {
639 switch (msg->body.type) {
640 case PG_ERROR:
641 lst_adde(conn->msgs, msg);
642 resp = pgmsg_parse(msg);
643 conn->error = &resp->msg_error;
644 rc = -1;
645 _wait_ready(conn);
646 break;
647 case PG_ROWDESC:
648 lst_adde(conn->msgs, msg);
649 resp = pgmsg_parse(msg);
650 conn->rowdesc = &resp->msg_rowdesc;
651 conn->nflds = conn->rowdesc->nflds;
652 break;
653 case PG_NODATA:
654 free(msg);
655 conn->nflds = 0;
656 break;
657 default:
658 free(msg);
659 break;
662 // ins
663 case PG_ERROR:
664 lst_adde(conn->msgs, msg);
665 resp = pgmsg_parse(msg);
666 conn->error = &resp->msg_error;
667 rc = -1;
668 _wait_ready(conn);
669 goto done;
670 case PG_CMDCOMPLETE:
671 lst_adde(conn->msgs, msg);
672 resp = pgmsg_parse(msg);
673 conn->complete = &resp->msg_complete;
674 break;
675 case PG_DATAROW:
676 lst_adde(conn->msgs, msg);
677 resp = pgmsg_parse(msg);
678 lst_adde(conn->row_list, &resp->msg_datarow);
679 break;
680 case PG_NODATA:
681 free(msg);
682 conn->nflds = 0;
683 break;
684 case PG_PORTALSUSPENDED:
685 lst_adde(conn->msgs, msg);
686 conn->suspended = msg;
687 break;
688 case PG_EMPTYQUERY:
689 lst_adde(conn->msgs, msg);
690 conn->emptyquery = msg;
691 break;
692 default:
693 free(msg);
694 break;
697 done:
698 return rc;
701 int pg_execsql (pgconn_t *conn, const char *sql, size_t sql_len) {
702 int rc;
703 pgmsg_t *msg;
704 _pg_close(conn);
705 if (0 == sql_len)
706 sql_len = strlen(sql);
707 msg = pgmsg_create_simple_query(sql, sql_len);
708 rc = pgmsg_send(conn->fd, msg);
709 free(msg);
710 if (0 == rc)
711 rc = _simple_exec(conn);
712 return rc;
715 int pg_copyin (pgconn_t *conn, const char *table_name) {
716 cstr_t *sql = cstrformat("copy %s from stdin", table_name);
717 int rc = pg_execsql(conn, sql->ptr, sql->len);
718 _pg_close(conn);
719 free(sql);
720 if (!rc) rc = conn->cols;
721 return rc;
724 int pg_copyin_sendln (pgconn_t *conn, pgfld_t **flds) {
725 pgmsg_t *msg = pgmsg_copyin_flds(conn->cols, flds);
726 if (!msg) return -1;
727 _pg_close(conn);
728 int rc = pgmsg_send(conn->fd, msg);
729 free(msg);
730 return rc;
733 int pg_copyin_end (pgconn_t *conn) {
734 pgmsg_t *msg = pgmsg_create(PG_COPYEND);
735 _pg_close(conn);
736 int rc = pgmsg_send(conn->fd, msg);
737 free(msg);
738 if (0 == rc)
739 rc = _simple_exec(conn);
740 return rc;
743 static int _pg_sync (pgconn_t *conn) {
744 int rc;
745 pgmsg_t *msg = pgmsg_create(PG_SYNC);
746 rc = pgmsg_send(conn->fd, msg);
747 free(msg);
748 return rc;
751 int pg_prepareln (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, int fld_len, pgfld_t **flds) {
752 int rc;
753 pgmsg_t *msg;
754 _pg_close(conn);
755 msg = pgmsg_create_parse(name, name_len, sql, sql_len, fld_len, flds);
756 rc = pgmsg_send(conn->fd, msg);
757 free(msg);
758 if (0 == rc && name && 0 == (rc = _pg_sync(conn)))
759 rc = _exec(conn);
760 return rc;
763 DEFINE_ARRAY(pgfld_array_t, pgfld_ptr_t);
765 static int pg_preparevn (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, pgfld_t *fld, va_list ap) {
766 int rc;
767 pgfld_array_t *flds;
768 INIT_ARRAY(pgfld_array_t, flds, 8, 8, NULL);
769 if (fld) {
770 ARRAY_ADD(flds, fld);
771 while (NULL != (fld = va_arg(ap, pgfld_ptr_t))) {
772 ARRAY_ADD(flds, fld);
775 rc = pg_prepareln(conn, name, name_len, sql, sql_len, flds->len, flds->ptr);
776 free(flds);
777 return rc;
780 int pg_preparen (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, pgfld_t *fld, ...) {
781 int rc;
782 va_list ap;
783 va_start(ap, fld);
784 rc = pg_preparevn(conn, name, name_len, sql, sql_len, fld, ap);
785 va_end(ap);
786 return rc;
789 int pg_prepare (pgconn_t *conn, const char *sql, size_t sql_len, pgfld_t *fld, ...) {
790 int rc;
791 va_list ap;
792 va_start(ap, fld);
793 rc = pg_preparevn(conn, CONST_STR_NULL, sql, sql_len, fld, ap);
794 va_end(ap);
795 return rc;
798 static int _pg_bind (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len,
799 int fld_len, pgfld_t **flds, int res_fmt_len, int *res_fmt) {
800 int rc;
801 pgmsg_t *msg;
802 _pg_close(conn);
803 msg = pgmsg_create_bind(portal, portal_len, stmt, stmt_len, fld_len, flds, res_fmt_len, res_fmt);
804 rc = pgmsg_send(conn->fd, msg);
805 free(msg);
806 return rc;
809 static int _pg_describe (pgconn_t *conn, int8_t op, const char *portal, size_t portal_len) {
810 int rc;
811 pgmsg_t *msg;
812 msg = pgmsg_create_describe(op, portal, portal_len);
813 rc = pgmsg_send(conn->fd, msg);
814 free(msg);
815 return rc;
818 static int _pg_execute (pgconn_t *conn, const char *portal, size_t portal_len, int max_rows) {
819 int rc;
820 pgmsg_t *msg;
821 msg = pgmsg_create_execute(portal, portal_len, max_rows);
822 rc = pgmsg_send(conn->fd, msg);
823 free(msg);
824 if (0 == rc && 0 == (rc = _pg_sync(conn)))
825 rc = _exec(conn);
826 return rc;
829 int pg_execln (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len,
830 int fld_len, pgfld_t **flds, int res_fmt_len, int *res_fmt, int max_data) {
831 int rc;
832 if (0 == (rc = _pg_bind(conn, portal, portal_len, stmt, stmt_len, fld_len, flds, res_fmt_len, res_fmt)) &&
833 0 == (rc = _pg_describe(conn, PG_PREPARED_PORTAL, portal, portal_len)))
834 rc = _pg_execute(conn, portal, portal_len, max_data);
835 return rc;
838 int pg_nextn (pgconn_t *conn, const char *portal, size_t portal_len, int max_data) {
839 return _pg_execute(conn, portal, portal_len, max_data);
842 int pg_execvn (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len, int max_data, int out_fmt, pgfld_t *fld, va_list ap) {
843 int rc;
844 pgfld_array_t *flds;
845 INIT_ARRAY(pgfld_array_t, flds, 8, 8, NULL);
846 if (fld) {
847 ARRAY_ADD(flds, fld);
848 while (NULL != (fld = va_arg(ap, pgfld_ptr_t))) {
849 ARRAY_ADD(flds, fld);
852 rc = pg_execln(conn, portal, portal_len, stmt, stmt_len, flds->len, flds->ptr, 1, &out_fmt, max_data);
853 free(flds);
854 return rc;
857 int pg_execn (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len, int max_data, int out_fmt, pgfld_t *fld, ...) {
858 int rc;
859 va_list ap;
860 va_start(ap, fld);
861 rc = pg_execvn(conn, portal, portal_len, stmt, stmt_len, max_data, out_fmt, fld, ap);
862 va_end(ap);
863 return rc;
866 int pg_exec (pgconn_t *conn, int max_data, int out_fmt, pgfld_t *fld, ...) {
867 int rc;
868 va_list ap;
869 va_start(ap, fld);
870 rc = pg_execvn(conn, CONST_STR_NULL, CONST_STR_NULL, max_data, out_fmt, fld, ap);
871 va_end(ap);
872 return rc;
875 int pg_release (pgconn_t *conn, const char *name, size_t name_len) {
876 pgmsg_t *msg = pgmsg_create_close(PG_OPNAME, name, 0 == name_len ? strlen(name) : name_len);
877 int rc = pgmsg_send(conn->fd, msg);
878 free(msg);
879 return rc;