Minor cleanups
[elliptics.git] / library / net.c
blobedc820dc7d44c9dcb24d85dd5425f9a99ea00091
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <poll.h>
24 #include <fcntl.h>
26 #include <netinet/tcp.h>
28 #include "elliptics.h"
29 #include "elliptics/packet.h"
30 #include "elliptics/interface.h"
32 #ifndef POLLRDHUP
33 #define POLLRDHUP 0x2000
34 #endif
36 static int dnet_socket_connect(struct dnet_node *n, int s, struct sockaddr *sa, unsigned int salen)
38 int err;
40 fcntl(s, F_SETFL, O_NONBLOCK);
41 fcntl(s, F_SETFD, FD_CLOEXEC);
43 err = connect(s, sa, salen);
44 if (err) {
45 struct pollfd pfd;
46 socklen_t slen;
47 int status;
49 pfd.fd = s;
50 pfd.revents = 0;
51 pfd.events = POLLOUT;
53 err = -errno;
54 if (err != -EINPROGRESS) {
55 dnet_log_err(n, "Failed to connect to %s:%d",
56 dnet_server_convert_addr(sa, salen),
57 dnet_server_convert_port(sa, salen));
58 goto err_out_exit;
61 err = poll(&pfd, 1, n->wait_ts.tv_sec * 1000 > 2000 ? n->wait_ts.tv_sec : 2000);
62 if (err < 0)
63 goto err_out_exit;
64 if (err == 0) {
65 err = -ETIMEDOUT;
66 dnet_log_err(n, "Failed to wait to connect to %s:%d",
67 dnet_server_convert_addr(sa, salen),
68 dnet_server_convert_port(sa, salen));
69 goto err_out_exit;
71 if ((!(pfd.revents & POLLOUT)) || (pfd.revents & (POLLERR | POLLHUP))) {
72 err = -ECONNREFUSED;
73 dnet_log_err(n, "Connection refused by %s:%d",
74 dnet_server_convert_addr(sa, salen),
75 dnet_server_convert_port(sa, salen));
76 goto err_out_exit;
79 status = 0;
80 slen = 4;
81 err = getsockopt(s, SOL_SOCKET, SO_ERROR, &status, &slen);
82 if (err || status) {
83 err = -errno;
84 if (!err)
85 err = -status;
86 dnet_log_err(n, "Failed to connect to %s:%d: %s [%d]",
87 dnet_server_convert_addr(sa, salen),
88 dnet_server_convert_port(sa, salen),
89 strerror(-err), err);
90 goto err_out_exit;
94 dnet_set_sockopt(s);
96 dnet_log(n, DNET_LOG_INFO, "Connected to %s:%d.\n",
97 dnet_server_convert_addr(sa, salen),
98 dnet_server_convert_port(sa, salen));
100 err = 0;
102 err_out_exit:
103 return err;
106 int dnet_socket_create_addr(struct dnet_node *n, int sock_type, int proto, int family,
107 struct sockaddr *sa, unsigned int salen, int listening)
109 int s, err = -1;
111 sa->sa_family = family;
112 s = socket(family, sock_type, proto);
113 if (s < 0) {
114 err = -errno;
115 dnet_log_err(n, "Failed to create socket for %s:%d: "
116 "family: %d, sock_type: %d, proto: %d",
117 dnet_server_convert_addr(sa, salen),
118 dnet_server_convert_port(sa, salen),
119 sa->sa_family, sock_type, proto);
120 goto err_out_exit;
123 if (listening) {
124 err = 1;
125 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &err, 4);
127 err = bind(s, sa, salen);
128 if (err) {
129 err = -errno;
130 dnet_log_err(n, "Failed to bind to %s:%d",
131 dnet_server_convert_addr(sa, salen),
132 dnet_server_convert_port(sa, salen));
133 goto err_out_close;
136 err = listen(s, 10240);
137 if (err) {
138 err = -errno;
139 dnet_log_err(n, "Failed to listen at %s:%d",
140 dnet_server_convert_addr(sa, salen),
141 dnet_server_convert_port(sa, salen));
142 goto err_out_close;
145 dnet_log(n, DNET_LOG_INFO, "Server is now listening at %s:%d.\n",
146 dnet_server_convert_addr(sa, salen),
147 dnet_server_convert_port(sa, salen));
149 fcntl(s, F_SETFL, O_NONBLOCK);
150 fcntl(s, F_SETFD, FD_CLOEXEC);
151 } else {
152 err = dnet_socket_connect(n, s, sa, salen);
153 if (err)
154 goto err_out_close;
157 return s;
159 err_out_close:
160 dnet_sock_close(s);
161 err_out_exit:
162 return err;
165 int dnet_fill_addr(struct dnet_addr *addr, const char *saddr, const char *port, const int family,
166 const int sock_type, const int proto)
168 struct addrinfo *ai = NULL, hint;
169 int err;
171 memset(&hint, 0, sizeof(struct addrinfo));
173 hint.ai_family = family;
174 hint.ai_socktype = sock_type;
175 hint.ai_protocol = proto;
177 err = getaddrinfo(saddr, port, &hint, &ai);
178 if (err || ai == NULL)
179 goto err_out_exit;
181 if (addr->addr_len >= ai->ai_addrlen)
182 addr->addr_len = ai->ai_addrlen;
183 else {
184 err = -ENOBUFS;
185 goto err_out_free;
187 memcpy(addr->addr, ai->ai_addr, addr->addr_len);
189 err_out_free:
190 freeaddrinfo(ai);
191 err_out_exit:
192 return err;
195 int dnet_socket_create(struct dnet_node *n, struct dnet_config *cfg,
196 struct dnet_addr *addr, int listening)
198 int s, err = -EINVAL;
199 struct dnet_net_state *st;
201 if (cfg->sock_type != n->sock_type)
202 cfg->sock_type = n->sock_type;
203 if (cfg->proto != n->proto)
204 cfg->proto = n->proto;
206 err = dnet_fill_addr(addr, cfg->addr, cfg->port, cfg->family, cfg->sock_type, cfg->proto);
207 if (err) {
208 dnet_log(n, DNET_LOG_ERROR, "Failed to get address info for %s:%s, family: %d, err: %d: %s.\n",
209 cfg->addr, cfg->port, cfg->family, err, strerror(-err));
210 goto err_out_exit;
213 st = dnet_state_search_by_addr(n, addr);
214 if (st) {
215 dnet_log(n, DNET_LOG_ERROR, "Address %s:%s already exists in route table\n", cfg->addr, cfg->port);
216 err = -EEXIST;
217 dnet_state_put(st);
218 goto err_out_exit;
221 s = dnet_socket_create_addr(n, cfg->sock_type, cfg->proto, cfg->family,
222 (struct sockaddr *)addr->addr, addr->addr_len, listening);
223 if (s < 0) {
224 err = s;
225 goto err_out_exit;
228 return s;
230 err_out_exit:
231 return err;
234 static void dnet_state_clean(struct dnet_net_state *st)
236 struct rb_node *rb_node;
237 struct dnet_trans *t;
238 int num = 0;
240 while (1) {
241 t = NULL;
243 pthread_mutex_lock(&st->trans_lock);
244 rb_node = rb_first(&st->trans_root);
245 if (rb_node) {
246 t = rb_entry(rb_node, struct dnet_trans, trans_entry);
247 dnet_trans_get(t);
248 dnet_trans_remove_nolock(&st->trans_root, t);
249 list_del_init(&t->trans_list_entry);
251 pthread_mutex_unlock(&st->trans_lock);
253 if (!t)
254 break;
256 dnet_trans_put(t);
257 dnet_trans_put(t);
258 num++;
261 dnet_log(st->n, DNET_LOG_NOTICE, "Cleaned state %s, transactions freed: %d\n", dnet_state_dump_addr(st), num);
265 * Eventually we may end up with proper reference counters here, but for now let's just copy the whole buf.
266 * Large data blocks are being sent through sendfile anyway, so it should not be _that_ costly operation.
268 static int dnet_io_req_queue(struct dnet_net_state *st, struct dnet_io_req *orig)
270 void *buf;
271 struct dnet_io_req *r;
272 int offset = 0;
273 int err;
275 buf = r = malloc(sizeof(struct dnet_io_req) + orig->dsize + orig->hsize);
276 if (!r) {
277 err = -ENOMEM;
278 goto err_out_exit;
280 memset(r, 0, sizeof(struct dnet_io_req));
281 r->fd = -1;
283 if (orig->header && orig->hsize) {
284 r->header = buf + sizeof(struct dnet_io_req);
285 r->hsize = orig->hsize;
287 offset = r->hsize;
288 memcpy(r->header, orig->header, r->hsize);
291 if (orig->data && orig->dsize) {
292 r->data = buf + sizeof(struct dnet_io_req) + offset;
293 r->dsize = orig->dsize;
295 offset += r->dsize;
296 memcpy(r->data, orig->data, r->dsize);
299 if (orig->fd >= 0 && orig->fsize) {
300 r->fd = orig->fd;
301 r->close_on_exit = orig->close_on_exit;
302 r->local_offset = orig->local_offset;
303 r->fsize = orig->fsize;
306 pthread_mutex_lock(&st->send_lock);
307 list_add_tail(&r->req_entry, &st->send_list);
309 if (!st->need_exit)
310 dnet_schedule_send(st);
311 pthread_mutex_unlock(&st->send_lock);
313 return 0;
315 err_out_exit:
316 return err;
319 void dnet_io_req_free(struct dnet_io_req *r)
321 if (r->fd >= 0 && r->fsize && r->close_on_exit)
322 close(r->fd);
323 free(r);
326 static int dnet_wait(struct dnet_net_state *st, unsigned int events, long timeout)
328 struct pollfd pfd;
329 int err;
331 pfd.fd = st->read_s;
332 pfd.revents = 0;
333 pfd.events = events;
335 err = poll(&pfd, 1, timeout);
336 if (err < 0) {
337 if (errno == EAGAIN || errno == EINTR) {
338 err = -EAGAIN;
339 goto out_exit;
342 dnet_log(st->n, DNET_LOG_ERROR, "Failed to wait for descriptor: err: %d, socket: %d.\n",
343 err, st->read_s);
344 err = -errno;
345 goto out_exit;
348 if (err == 0) {
349 err = -EAGAIN;
350 goto out_exit;
353 if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL)) {
354 dnet_log(st->n, DNET_LOG_DEBUG, "Connection reset by peer: sock: %d, revents: %x.\n",
355 st->read_s, pfd.revents);
356 err = -ECONNRESET;
357 goto out_exit;
360 if (pfd.revents & events) {
361 err = 0;
362 goto out_exit;
365 dnet_log(st->n, DNET_LOG_ERROR, "Socket reported error: sock: %d, revents: %x.\n",
366 st->read_s, pfd.revents);
367 err = -EINVAL;
368 out_exit:
369 if (st->n->need_exit || st->need_exit) {
370 dnet_log(st->n, DNET_LOG_ERROR, "Need to exit.\n");
371 err = -EIO;
374 return err;
377 ssize_t dnet_send_nolock(struct dnet_net_state *st, void *data, uint64_t size)
379 ssize_t err = 0;
380 struct dnet_node *n = st->n;
382 while (size) {
383 err = send(st->write_s, data, size, 0);
384 if (err < 0) {
385 err = -errno;
386 if (err != -EAGAIN)
387 dnet_log_err(n, "Failed to send packet: size: %llu, socket: %d",
388 (unsigned long long)size, st->write_s);
389 break;
392 if (err == 0) {
393 dnet_log(n, DNET_LOG_ERROR, "Peer %s has dropped the connection: socket: %d.\n", dnet_state_dump_addr(st), st->write_s);
394 err = -ECONNRESET;
395 break;
398 data += err;
399 size -= err;
400 st->send_offset += err;
402 err = 0;
405 return err;
408 ssize_t dnet_send(struct dnet_net_state *st, void *data, uint64_t size)
410 struct dnet_io_req r;
412 memset(&r, 0, sizeof(r));
413 r.data = data;
414 r.dsize = size;
415 r.fd = -1;
417 return dnet_io_req_queue(st, &r);
420 ssize_t dnet_send_data(struct dnet_net_state *st, void *header, uint64_t hsize, void *data, uint64_t dsize)
422 struct dnet_io_req r;
424 memset(&r, 0, sizeof(r));
425 r.header = header;
426 r.hsize = hsize;
427 r.data = data;
428 r.dsize = dsize;
429 r.fd = -1;
431 return dnet_io_req_queue(st, &r);
434 static ssize_t dnet_send_fd_nolock(struct dnet_net_state *st, int fd, uint64_t offset, uint64_t dsize)
436 ssize_t err;
438 while (dsize) {
439 err = dnet_sendfile(st, fd, &offset, dsize);
440 if (err < 0)
441 break;
442 if (err == 0) {
443 err = -ENODATA;
444 dnet_log_err(st->n, "Looks like truncated file: fd: %d, offset: %llu, size: %llu.\n",
445 fd, (unsigned long long)offset, (unsigned long long)dsize);
446 break;
449 dsize -= err;
450 st->send_offset += err;
451 err = 0;
454 return err;
457 ssize_t dnet_send_fd(struct dnet_net_state *st, void *header, uint64_t hsize,
458 int fd, uint64_t offset, uint64_t fsize, int close_on_exit)
460 struct dnet_io_req r;
462 memset(&r, 0, sizeof(r));
463 r.header = header;
464 r.hsize = hsize;
465 r.fd = fd;
466 r.close_on_exit = close_on_exit;
467 r.local_offset = offset;
468 r.fsize = fsize;
470 return dnet_io_req_queue(st, &r);
473 static void dnet_trans_timestamp(struct dnet_net_state *st, struct dnet_trans *t)
475 gettimeofday(&t->time, NULL);
476 t->time.tv_sec += st->n->wait_ts.tv_sec;
478 list_move_tail(&t->trans_list_entry, &st->trans_list);
481 int dnet_trans_send(struct dnet_trans *t, struct dnet_io_req *req)
483 struct dnet_net_state *st = req->st;
484 int err;
486 dnet_trans_get(t);
488 pthread_mutex_lock(&st->trans_lock);
489 err = dnet_trans_insert_nolock(&st->trans_root, t);
490 if (!err)
491 dnet_trans_timestamp(st, t);
492 pthread_mutex_unlock(&st->trans_lock);
493 if (err)
494 goto err_out_put;
496 err = dnet_io_req_queue(st, req);
497 if (err)
498 goto err_out_remove;
500 dnet_trans_put(t);
501 return 0;
503 err_out_remove:
504 dnet_trans_remove(t);
505 err_out_put:
506 dnet_trans_put(t);
507 return err;
510 int dnet_recv(struct dnet_net_state *st, void *data, unsigned int size)
512 int err;
513 int wait = st->n->wait_ts.tv_sec;
515 while (size) {
516 err = dnet_wait(st, POLLIN, 1000);
517 if (err < 0) {
518 if (err == -EAGAIN) {
519 if (--wait > 0)
520 continue;
522 err = -ETIMEDOUT;
524 return err;
527 err = recv(st->read_s, data, size, 0);
528 if (err < 0) {
529 dnet_log_err(st->n, "Failed to recv packet: size: %u", size);
530 return err;
533 if (err == 0) {
534 dnet_log(st->n, DNET_LOG_ERROR, "dnet_recv: peer %s has disconnected.\n",
535 dnet_server_convert_dnet_addr(&st->addr));
536 return -ECONNRESET;
539 data += err;
540 size -= err;
541 wait = st->n->wait_ts.tv_sec;
544 return 0;
547 static struct dnet_trans *dnet_trans_new(struct dnet_net_state *st)
549 struct dnet_trans *t;
551 t = dnet_trans_alloc(st->n, 0);
552 if (!t)
553 goto err_out_exit;
555 return t;
557 err_out_exit:
558 return NULL;
561 int dnet_add_reconnect_state(struct dnet_node *n, struct dnet_addr *addr, unsigned int join_state)
563 struct dnet_addr_storage *a, *it;
564 int err = 0;
566 if (!join_state || n->need_exit) {
567 if (!join_state)
568 dnet_log(n, DNET_LOG_INFO, "Do not add reconnection addr: %s, join state: %x.\n",
569 dnet_server_convert_dnet_addr(addr), join_state);
570 goto out_exit;
573 a = malloc(sizeof(struct dnet_addr_storage));
574 if (!a) {
575 err = -ENOMEM;
576 goto out_exit;
578 memset(a, 0, sizeof(struct dnet_addr_storage));
580 memcpy(&a->addr, addr, sizeof(struct dnet_addr));
581 a->__join_state = join_state;
583 pthread_mutex_lock(&n->reconnect_lock);
584 list_for_each_entry(it, &n->reconnect_list, reconnect_entry) {
585 if (!memcmp(&it->addr, &a->addr, sizeof(struct dnet_addr))) {
586 dnet_log(n, DNET_LOG_INFO, "Address already exists in reconnection array: addr: %s, join state: %x.\n",
587 dnet_server_convert_dnet_addr(&a->addr), join_state);
588 err = -EEXIST;
589 break;
593 if (!err) {
594 dnet_log(n, DNET_LOG_INFO, "Added reconnection addr: %s, join state: %x.\n",
595 dnet_server_convert_dnet_addr(&a->addr), join_state);
596 list_add_tail(&a->reconnect_entry, &n->reconnect_list);
598 pthread_mutex_unlock(&n->reconnect_lock);
600 if (err)
601 free(a);
603 out_exit:
604 return err;
607 static int dnet_trans_complete_forward(struct dnet_net_state *state __unused, struct dnet_cmd *cmd, void *priv)
609 struct dnet_trans *t = priv;
610 struct dnet_net_state *orig = t->orig;
611 int err = -EINVAL;
613 if (!is_trans_destroyed(state, cmd)) {
614 uint64_t size = cmd->size;
616 cmd->trans = t->rcv_trans | DNET_TRANS_REPLY;
618 dnet_convert_cmd(cmd);
620 err = dnet_send_data(orig, cmd, sizeof(struct dnet_cmd), cmd + 1, size);
623 return err;
626 static int dnet_trans_forward(struct dnet_trans *t, struct dnet_io_req *r,
627 struct dnet_net_state *orig, struct dnet_net_state *forward)
629 struct dnet_cmd *cmd = r->header;
631 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
633 t->rcv_trans = cmd->trans;
634 cmd->trans = t->cmd.trans = t->trans = atomic_inc(&orig->n->trans);
636 dnet_convert_cmd(cmd);
638 t->command = cmd->cmd;
639 t->complete = dnet_trans_complete_forward;
640 t->priv = t;
642 t->orig = dnet_state_get(orig);
643 t->st = dnet_state_get(forward);
645 r->st = forward;
648 char saddr[128];
649 char daddr[128];
651 dnet_log(orig->n, DNET_LOG_INFO, "%s: forwarding %s trans: %s -> %s, trans: %llu -> %llu\n",
652 dnet_dump_id(&t->cmd.id), dnet_cmd_string(t->command),
653 dnet_server_convert_dnet_addr_raw(&orig->addr, saddr, sizeof(saddr)),
654 dnet_server_convert_dnet_addr_raw(&forward->addr, daddr, sizeof(daddr)),
655 (unsigned long long)t->rcv_trans, (unsigned long long)t->trans);
658 return dnet_trans_send(t, r);
661 int dnet_process_recv(struct dnet_net_state *st, struct dnet_io_req *r)
663 int err = 0;
664 struct dnet_trans *t = NULL;
665 struct dnet_node *n = st->n;
666 struct dnet_net_state *forward_state;
667 struct dnet_cmd *cmd = r->header;
669 if (cmd->trans & DNET_TRANS_REPLY) {
670 uint64_t tid = cmd->trans & ~DNET_TRANS_REPLY;
672 pthread_mutex_lock(&st->trans_lock);
673 t = dnet_trans_search(&st->trans_root, tid);
674 if (t) {
675 if (!(cmd->flags & DNET_FLAGS_MORE)) {
676 dnet_trans_remove_nolock(&st->trans_root, t);
677 list_del_init(&t->trans_list_entry);
678 } else
679 dnet_trans_timestamp(st, t);
681 pthread_mutex_unlock(&st->trans_lock);
683 if (!t) {
684 dnet_log(n, DNET_LOG_ERROR, "%s: could not find transaction for reply: trans %llu.\n",
685 dnet_dump_id(&cmd->id), (unsigned long long)tid);
686 err = 0;
687 goto err_out_exit;
690 if (t->complete)
691 t->complete(t->st, cmd, t->priv);
693 dnet_trans_put(t);
694 if (!(cmd->flags & DNET_FLAGS_MORE)) {
695 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
696 dnet_trans_put(t);
698 goto out;
700 #if 1
701 forward_state = dnet_state_get_first(n, &cmd->id);
702 if (!forward_state || forward_state == st || forward_state == n->st ||
703 (st->rcv_cmd.flags & DNET_FLAGS_DIRECT)) {
704 dnet_state_put(forward_state);
706 err = dnet_process_cmd_raw(st, cmd, r->data);
707 goto out;
710 t = dnet_trans_new(st);
711 if (!t) {
712 err = -ENOMEM;
713 goto err_out_put_forward;
716 err = dnet_trans_forward(t, r, st, forward_state);
717 if (err)
718 goto err_out_destroy;
720 dnet_state_put(forward_state);
721 #else
722 err = dnet_process_cmd_raw(st, cmd, r->data);
723 #endif
724 out:
725 return err;
727 err_out_destroy:
728 dnet_trans_put(t);
729 err_out_put_forward:
730 dnet_state_put(forward_state);
731 err_out_exit:
732 if (t)
733 dnet_log(n, DNET_LOG_ERROR, "%s: error during received transaction processing: trans %llu, reply: %d, error: %d.\n",
734 dnet_dump_id(&t->cmd.id), (t->cmd.trans & ~DNET_TRANS_REPLY),
735 !!(t->cmd.trans & DNET_TRANS_REPLY), err);
736 return err;
739 void dnet_state_remove_nolock(struct dnet_net_state *st)
741 list_del_init(&st->state_entry);
742 list_del_init(&st->storage_state_entry);
743 dnet_idc_destroy_nolock(st);
746 static void dnet_state_remove(struct dnet_net_state *st)
748 struct dnet_node *n = st->n;
750 pthread_mutex_lock(&n->state_lock);
751 dnet_state_remove_nolock(st);
752 pthread_mutex_unlock(&n->state_lock);
755 void dnet_state_reset(struct dnet_net_state *st)
757 dnet_state_remove(st);
759 pthread_mutex_lock(&st->send_lock);
760 if (!st->need_exit)
761 st->need_exit = -ECONNRESET;
762 dnet_unschedule_send(st);
763 pthread_mutex_unlock(&st->send_lock);
765 dnet_unschedule_recv(st);
767 dnet_add_reconnect_state(st->n, &st->addr, st->__join_state);
769 dnet_state_clean(st);
770 dnet_state_put(st);
773 void dnet_sock_close(int s)
775 shutdown(s, 2);
776 close(s);
779 void dnet_set_sockopt(int s)
781 struct linger l;
782 int opt;
784 opt = 1;
785 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &opt, 4);
787 opt = 3;
788 setsockopt(s, IPPROTO_TCP, TCP_KEEPCNT, &opt, 4);
789 opt = 10;
790 setsockopt(s, IPPROTO_TCP, TCP_KEEPIDLE, &opt, 4);
791 opt = 10;
792 setsockopt(s, IPPROTO_TCP, TCP_KEEPINTVL, &opt, 4);
794 l.l_onoff = 1;
795 l.l_linger = 1;
797 setsockopt(s, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
799 fcntl(s, F_SETFD, FD_CLOEXEC);
800 fcntl(s, F_SETFL, O_NONBLOCK);
803 int dnet_setup_control_nolock(struct dnet_net_state *st)
805 struct dnet_node *n = st->n;
806 struct dnet_io *io = n->io;
807 int err, pos;
809 if (st->epoll_fd == -1) {
810 pos = io->net_thread_pos;
811 if (++io->net_thread_pos >= io->net_thread_num)
812 io->net_thread_pos = 0;
813 st->epoll_fd = io->net[pos].epoll_fd;
815 err = dnet_schedule_recv(st);
816 if (err)
817 goto err_out_unschedule;
820 return 0;
822 err_out_unschedule:
823 dnet_unschedule_send(st);
824 dnet_unschedule_recv(st);
826 st->epoll_fd = -1;
827 list_del_init(&st->storage_state_entry);
828 return err;
831 static int dnet_auth_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv __unused)
833 struct dnet_node *n;
835 if (!state || !cmd)
836 return -EPERM;
838 /* this means this callback at least has state and cmd */
839 if (!is_trans_destroyed(state, cmd)) {
840 n = state->n;
842 if (cmd->status == 0) {
843 dnet_log(n, DNET_LOG_INFO, "%s: authentication request suceeded\n", dnet_state_dump_addr(state));
844 return 0;
847 dnet_log(n, DNET_LOG_ERROR, "%s: authentication request failed: %d\n", dnet_state_dump_addr(state), cmd->status);
849 state->__join_state = 0;
850 dnet_state_reset(state);
853 return cmd->status;
856 static int dnet_auth_send(struct dnet_net_state *st)
858 struct dnet_node *n = st->n;
859 struct dnet_trans_control ctl;
860 struct dnet_auth a;
862 memset(&a, 0, sizeof(struct dnet_auth));
864 memcpy(a.cookie, n->cookie, DNET_AUTH_COOKIE_SIZE);
865 dnet_convert_auth(&a);
867 memset(&ctl, 0, sizeof(struct dnet_trans_control));
869 ctl.cmd = DNET_CMD_AUTH;
870 ctl.cflags = DNET_FLAGS_DIRECT | DNET_FLAGS_NEED_ACK;
871 ctl.size = sizeof(struct dnet_auth);
872 ctl.data = &a;
874 ctl.complete = dnet_auth_complete;
876 return dnet_trans_alloc_send_state(st, &ctl);
879 struct dnet_net_state *dnet_state_create(struct dnet_node *n,
880 int group_id, struct dnet_raw_id *ids, int id_num,
881 struct dnet_addr *addr, int s, int *errp, int join,
882 int (* process)(struct dnet_net_state *st, struct epoll_event *ev))
884 int err = -ENOMEM;
885 struct dnet_net_state *st;
887 if (ids && id_num) {
888 st = dnet_state_search_by_addr(n, addr);
889 if (st) {
890 err = -EEXIST;
891 dnet_state_put(st);
892 goto err_out_close;
896 st = malloc(sizeof(struct dnet_net_state));
897 if (!st)
898 goto err_out_close;
900 memset(st, 0, sizeof(struct dnet_net_state));
902 st->read_s = s;
903 st->write_s = dup(s);
904 if (st->write_s < 0) {
905 err = -errno;
906 dnet_log_err(n, "%s: failed to duplicate socket", dnet_server_convert_dnet_addr(addr));
907 goto err_out_free;
910 fcntl(st->write_s, F_SETFD, FD_CLOEXEC);
912 st->n = n;
914 st->process = process;
916 st->la = 1;
917 st->weight = DNET_STATE_MAX_WEIGHT / 2;
918 st->median_read_time = 1000; /* useconds for start */
920 INIT_LIST_HEAD(&st->state_entry);
921 INIT_LIST_HEAD(&st->storage_state_entry);
923 st->trans_root = RB_ROOT;
924 INIT_LIST_HEAD(&st->trans_list);
926 st->epoll_fd = -1;
928 err = pthread_mutex_init(&st->trans_lock, NULL);
929 if (err) {
930 err = -err;
931 dnet_log_err(n, "Failed to initialize transaction mutex: %d", err);
932 goto err_out_dup_destroy;
935 INIT_LIST_HEAD(&st->send_list);
936 err = pthread_mutex_init(&st->send_lock, NULL);
937 if (err) {
938 err = -err;
939 dnet_log_err(n, "Failed to initialize send mutex: %d", err);
940 goto err_out_trans_destroy;
943 atomic_init(&st->refcnt, 1);
945 memcpy(&st->addr, addr, sizeof(struct dnet_addr));
947 dnet_schedule_command(st);
948 st->__join_state = join;
950 if (n->client_prio) {
951 err = setsockopt(st->read_s, IPPROTO_IP, IP_TOS, &n->client_prio, 4);
952 if (err) {
953 err = -errno;
954 dnet_log_err(n, "could not set read client prio %d", n->client_prio);
956 err = setsockopt(st->write_s, IPPROTO_IP, IP_TOS, &n->client_prio, 4);
957 if (err) {
958 err = -errno;
959 dnet_log_err(n, "could not set write client prio %d", n->client_prio);
962 if (!err) {
963 dnet_log(n, DNET_LOG_INFO, "%s: client net TOS value set to %d\n",
964 dnet_server_convert_dnet_addr(addr), n->client_prio);
969 * it is possible that state can be removed after inserted into route table,
970 * so we should grab a reference here and drop it after we are done
972 dnet_state_get(st);
974 if (ids && id_num) {
975 err = dnet_idc_create(st, group_id, ids, id_num);
976 if (err)
977 goto err_out_send_destroy;
979 if ((st->__join_state == DNET_JOIN) && (addr != &n->addr)) {
980 pthread_mutex_lock(&n->state_lock);
981 err = dnet_state_join_nolock(st);
982 pthread_mutex_unlock(&n->state_lock);
984 err = dnet_auth_send(st);
986 } else {
987 pthread_mutex_lock(&n->state_lock);
988 list_add_tail(&st->state_entry, &n->empty_state_list);
989 list_add_tail(&st->storage_state_entry, &n->storage_state_list);
991 err = dnet_setup_control_nolock(st);
992 if (err)
993 goto err_out_unlock;
994 pthread_mutex_unlock(&n->state_lock);
997 if (atomic_read(&st->refcnt) == 1) {
998 err = st->need_exit;
999 if (!err)
1000 err = -ECONNRESET;
1002 dnet_state_put(st);
1004 if (err)
1005 goto err_out_exit;
1007 return st;
1009 err_out_unlock:
1010 list_del_init(&st->state_entry);
1011 pthread_mutex_unlock(&n->state_lock);
1012 err_out_send_destroy:
1013 dnet_state_put(st);
1014 pthread_mutex_destroy(&st->send_lock);
1015 err_out_trans_destroy:
1016 pthread_mutex_destroy(&st->trans_lock);
1017 err_out_dup_destroy:
1018 dnet_sock_close(st->write_s);
1019 err_out_free:
1020 free(st);
1021 err_out_close:
1022 dnet_sock_close(s);
1024 err_out_exit:
1025 if (err == -EEXIST)
1026 dnet_log(n, DNET_LOG_NOTICE, "%s: state already exists.\n", dnet_server_convert_dnet_addr(addr));
1027 *errp = err;
1028 return NULL;
1031 int dnet_state_num(struct dnet_session *s)
1033 struct dnet_node *n = s->node;
1034 struct dnet_net_state *st;
1035 struct dnet_group *g;
1036 int num = 0;
1038 pthread_mutex_lock(&n->state_lock);
1039 list_for_each_entry(g, &n->group_list, group_entry) {
1040 list_for_each_entry(st, &g->state_list, state_entry)
1041 num++;
1043 pthread_mutex_unlock(&n->state_lock);
1045 return num;
1048 static void dnet_state_send_clean(struct dnet_net_state *st)
1050 struct dnet_io_req *r, *tmp;
1052 list_for_each_entry_safe(r, tmp, &st->send_list, req_entry) {
1053 list_del(&r->req_entry);
1054 dnet_io_req_free(r);
1058 void dnet_state_destroy(struct dnet_net_state *st)
1060 dnet_state_remove(st);
1062 if (st->read_s >= 0) {
1063 dnet_sock_close(st->read_s);
1064 dnet_sock_close(st->write_s);
1067 dnet_state_clean(st);
1069 dnet_state_send_clean(st);
1071 pthread_mutex_destroy(&st->send_lock);
1072 pthread_mutex_destroy(&st->trans_lock);
1074 dnet_log(st->n, DNET_LOG_NOTICE, "Freeing state %s, socket: %d/%d.\n",
1075 dnet_server_convert_dnet_addr(&st->addr), st->read_s, st->write_s);
1077 free(st);
1080 int dnet_send_reply(void *state, struct dnet_cmd *cmd, void *odata, unsigned int size, int more)
1082 struct dnet_net_state *st = state;
1083 struct dnet_cmd *c;
1084 void *data;
1085 int err;
1087 if (st == st->n->st)
1088 return 0;
1090 c = malloc(sizeof(struct dnet_cmd) + size);
1091 if (!c)
1092 return -ENOMEM;
1094 memset(c, 0, sizeof(struct dnet_cmd) + size);
1096 data = c + 1;
1097 *c = *cmd;
1099 if ((cmd->flags & DNET_FLAGS_NEED_ACK) || more)
1100 c->flags |= DNET_FLAGS_MORE;
1102 c->size = size;
1103 c->trans |= DNET_TRANS_REPLY;
1105 if (size)
1106 memcpy(data, odata, size);
1108 dnet_log(st->n, DNET_LOG_NOTICE, "%s: %s: reply: size: %u, cflags: %llx.\n",
1109 dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), size, (unsigned long long)c->flags);
1111 dnet_convert_cmd(c);
1113 err = dnet_send(st, c, sizeof(struct dnet_cmd) + size);
1114 free(c);
1116 return err;
1119 int dnet_send_request(struct dnet_net_state *st, struct dnet_io_req *r)
1121 int cork;
1122 int err = 0;
1123 size_t offset = st->send_offset;
1125 /* Use TCP_CORK to send headers and packet body in one piece */
1126 cork = 1;
1127 setsockopt(st->write_s, IPPROTO_TCP, TCP_CORK, &cork, 4);
1129 if (r->hsize && r->header && st->send_offset < r->hsize) {
1130 err = dnet_send_nolock(st, r->header + offset, r->hsize - offset);
1131 if (err)
1132 goto err_out_exit;
1135 if (r->dsize && r->data && st->send_offset < (r->dsize + r->hsize)) {
1136 offset = st->send_offset - r->hsize;
1137 err = dnet_send_nolock(st, r->data + offset, r->dsize - offset);
1138 if (err)
1139 goto err_out_exit;
1142 if (r->fd >= 0 && r->fsize && st->send_offset < (r->dsize + r->hsize + r->fsize)) {
1143 offset = st->send_offset - r->dsize - r->hsize;
1144 err = dnet_send_fd_nolock(st, r->fd, r->local_offset + offset, r->fsize - offset);
1145 if (err)
1146 goto err_out_exit;
1149 if (r->hsize > sizeof(struct dnet_cmd)) {
1150 struct dnet_cmd *cmd = r->header;
1151 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
1153 dnet_log(st->n, DNET_LOG_DEBUG, "%s: %s: SENT %s cmd: %s: cmd-size: %llu, nonblocking: %d\n",
1154 dnet_state_dump_addr(st), dnet_dump_id(r->header),
1155 nonblocking ? "nonblocking" : "blocking",
1156 dnet_cmd_string(cmd->cmd),
1157 (unsigned long long)cmd->size, nonblocking);
1160 err_out_exit:
1161 if (st->send_offset == (r->dsize + r->hsize + r->fsize)) {
1162 pthread_mutex_lock(&st->send_lock);
1163 list_del(&r->req_entry);
1164 pthread_mutex_unlock(&st->send_lock);
1166 dnet_io_req_free(r);
1167 st->send_offset = 0;
1170 if (err && err != -EAGAIN) {
1171 dnet_log(st->n, DNET_LOG_ERROR, "%s: setting send need_exit to %d\n", dnet_state_dump_addr(st), err);
1172 st->need_exit = err;
1175 cork = 0;
1176 setsockopt(st->write_s, IPPROTO_TCP, TCP_CORK, &cork, 4);
1178 return err;
1181 int dnet_parse_addr(char *addr, struct dnet_config *cfg)
1183 char *fam, *port;
1185 fam = strrchr(addr, DNET_CONF_ADDR_DELIM);
1186 if (!fam)
1187 goto err_out_print_wrong_param;
1188 *fam++ = 0;
1189 if (!fam)
1190 goto err_out_print_wrong_param;
1192 cfg->family = atoi(fam);
1194 port = strrchr(addr, DNET_CONF_ADDR_DELIM);
1195 if (!port)
1196 goto err_out_print_wrong_param;
1197 *port++ = 0;
1198 if (!port)
1199 goto err_out_print_wrong_param;
1201 memset(cfg->addr, 0, sizeof(cfg->addr));
1202 memset(cfg->port, 0, sizeof(cfg->port));
1204 snprintf(cfg->addr, sizeof(cfg->addr), "%s", addr);
1205 snprintf(cfg->port, sizeof(cfg->port), "%s", port);
1207 if (!strcmp(addr, "hostname")) {
1208 int err;
1210 err = gethostname(cfg->addr, sizeof(cfg->addr));
1211 if (err) {
1212 err = -errno;
1213 fprintf(stderr, "Could not get hostname: %s %d\n", strerror(-err), err);
1214 goto err_out_print_wrong_param;
1218 return 0;
1220 err_out_print_wrong_param:
1221 fprintf(stderr, "Wrong address parameter '%s', should be 'addr%cport%cfamily'.\n",
1222 addr, DNET_CONF_ADDR_DELIM, DNET_CONF_ADDR_DELIM);
1223 return -EINVAL;