2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
26 #include <netinet/tcp.h>
28 #include "elliptics.h"
29 #include "elliptics/packet.h"
30 #include "elliptics/interface.h"
33 #define POLLRDHUP 0x2000
36 static int dnet_socket_connect(struct dnet_node
*n
, int s
, struct sockaddr
*sa
, unsigned int salen
)
40 fcntl(s
, F_SETFL
, O_NONBLOCK
);
41 fcntl(s
, F_SETFD
, FD_CLOEXEC
);
43 err
= connect(s
, sa
, salen
);
54 if (err
!= -EINPROGRESS
) {
55 dnet_log_err(n
, "Failed to connect to %s:%d",
56 dnet_server_convert_addr(sa
, salen
),
57 dnet_server_convert_port(sa
, salen
));
61 err
= poll(&pfd
, 1, n
->wait_ts
.tv_sec
* 1000 > 2000 ? n
->wait_ts
.tv_sec
: 2000);
66 dnet_log_err(n
, "Failed to wait to connect to %s:%d",
67 dnet_server_convert_addr(sa
, salen
),
68 dnet_server_convert_port(sa
, salen
));
71 if ((!(pfd
.revents
& POLLOUT
)) || (pfd
.revents
& (POLLERR
| POLLHUP
))) {
73 dnet_log_err(n
, "Connection refused by %s:%d",
74 dnet_server_convert_addr(sa
, salen
),
75 dnet_server_convert_port(sa
, salen
));
81 err
= getsockopt(s
, SOL_SOCKET
, SO_ERROR
, &status
, &slen
);
86 dnet_log_err(n
, "Failed to connect to %s:%d: %s [%d]",
87 dnet_server_convert_addr(sa
, salen
),
88 dnet_server_convert_port(sa
, salen
),
96 dnet_log(n
, DNET_LOG_INFO
, "Connected to %s:%d.\n",
97 dnet_server_convert_addr(sa
, salen
),
98 dnet_server_convert_port(sa
, salen
));
106 int dnet_socket_create_addr(struct dnet_node
*n
, int sock_type
, int proto
, int family
,
107 struct sockaddr
*sa
, unsigned int salen
, int listening
)
111 sa
->sa_family
= family
;
112 s
= socket(family
, sock_type
, proto
);
115 dnet_log_err(n
, "Failed to create socket for %s:%d: "
116 "family: %d, sock_type: %d, proto: %d",
117 dnet_server_convert_addr(sa
, salen
),
118 dnet_server_convert_port(sa
, salen
),
119 sa
->sa_family
, sock_type
, proto
);
125 setsockopt(s
, SOL_SOCKET
, SO_REUSEADDR
, &err
, 4);
127 err
= bind(s
, sa
, salen
);
130 dnet_log_err(n
, "Failed to bind to %s:%d",
131 dnet_server_convert_addr(sa
, salen
),
132 dnet_server_convert_port(sa
, salen
));
136 err
= listen(s
, 10240);
139 dnet_log_err(n
, "Failed to listen at %s:%d",
140 dnet_server_convert_addr(sa
, salen
),
141 dnet_server_convert_port(sa
, salen
));
145 dnet_log(n
, DNET_LOG_INFO
, "Server is now listening at %s:%d.\n",
146 dnet_server_convert_addr(sa
, salen
),
147 dnet_server_convert_port(sa
, salen
));
149 fcntl(s
, F_SETFL
, O_NONBLOCK
);
150 fcntl(s
, F_SETFD
, FD_CLOEXEC
);
152 err
= dnet_socket_connect(n
, s
, sa
, salen
);
165 int dnet_fill_addr(struct dnet_addr
*addr
, const char *saddr
, const char *port
, const int family
,
166 const int sock_type
, const int proto
)
168 struct addrinfo
*ai
= NULL
, hint
;
171 memset(&hint
, 0, sizeof(struct addrinfo
));
173 hint
.ai_family
= family
;
174 hint
.ai_socktype
= sock_type
;
175 hint
.ai_protocol
= proto
;
177 err
= getaddrinfo(saddr
, port
, &hint
, &ai
);
178 if (err
|| ai
== NULL
)
181 if (addr
->addr_len
>= ai
->ai_addrlen
)
182 addr
->addr_len
= ai
->ai_addrlen
;
187 memcpy(addr
->addr
, ai
->ai_addr
, addr
->addr_len
);
195 int dnet_socket_create(struct dnet_node
*n
, struct dnet_config
*cfg
,
196 struct dnet_addr
*addr
, int listening
)
198 int s
, err
= -EINVAL
;
199 struct dnet_net_state
*st
;
201 if (cfg
->sock_type
!= n
->sock_type
)
202 cfg
->sock_type
= n
->sock_type
;
203 if (cfg
->proto
!= n
->proto
)
204 cfg
->proto
= n
->proto
;
206 err
= dnet_fill_addr(addr
, cfg
->addr
, cfg
->port
, cfg
->family
, cfg
->sock_type
, cfg
->proto
);
208 dnet_log(n
, DNET_LOG_ERROR
, "Failed to get address info for %s:%s, family: %d, err: %d: %s.\n",
209 cfg
->addr
, cfg
->port
, cfg
->family
, err
, strerror(-err
));
213 st
= dnet_state_search_by_addr(n
, addr
);
215 dnet_log(n
, DNET_LOG_ERROR
, "Address %s:%s already exists in route table\n", cfg
->addr
, cfg
->port
);
221 s
= dnet_socket_create_addr(n
, cfg
->sock_type
, cfg
->proto
, cfg
->family
,
222 (struct sockaddr
*)addr
->addr
, addr
->addr_len
, listening
);
234 static void dnet_state_clean(struct dnet_net_state
*st
)
236 struct rb_node
*rb_node
;
237 struct dnet_trans
*t
;
243 pthread_mutex_lock(&st
->trans_lock
);
244 rb_node
= rb_first(&st
->trans_root
);
246 t
= rb_entry(rb_node
, struct dnet_trans
, trans_entry
);
248 dnet_trans_remove_nolock(&st
->trans_root
, t
);
249 list_del_init(&t
->trans_list_entry
);
251 pthread_mutex_unlock(&st
->trans_lock
);
261 dnet_log(st
->n
, DNET_LOG_NOTICE
, "Cleaned state %s, transactions freed: %d\n", dnet_state_dump_addr(st
), num
);
265 * Eventually we may end up with proper reference counters here, but for now let's just copy the whole buf.
266 * Large data blocks are being sent through sendfile anyway, so it should not be _that_ costly operation.
268 static int dnet_io_req_queue(struct dnet_net_state
*st
, struct dnet_io_req
*orig
)
271 struct dnet_io_req
*r
;
275 buf
= r
= malloc(sizeof(struct dnet_io_req
) + orig
->dsize
+ orig
->hsize
);
280 memset(r
, 0, sizeof(struct dnet_io_req
));
283 if (orig
->header
&& orig
->hsize
) {
284 r
->header
= buf
+ sizeof(struct dnet_io_req
);
285 r
->hsize
= orig
->hsize
;
288 memcpy(r
->header
, orig
->header
, r
->hsize
);
291 if (orig
->data
&& orig
->dsize
) {
292 r
->data
= buf
+ sizeof(struct dnet_io_req
) + offset
;
293 r
->dsize
= orig
->dsize
;
296 memcpy(r
->data
, orig
->data
, r
->dsize
);
299 if (orig
->fd
>= 0 && orig
->fsize
) {
301 r
->close_on_exit
= orig
->close_on_exit
;
302 r
->local_offset
= orig
->local_offset
;
303 r
->fsize
= orig
->fsize
;
306 pthread_mutex_lock(&st
->send_lock
);
307 list_add_tail(&r
->req_entry
, &st
->send_list
);
310 dnet_schedule_send(st
);
311 pthread_mutex_unlock(&st
->send_lock
);
319 void dnet_io_req_free(struct dnet_io_req
*r
)
321 if (r
->fd
>= 0 && r
->fsize
&& r
->close_on_exit
)
326 static int dnet_wait(struct dnet_net_state
*st
, unsigned int events
, long timeout
)
335 err
= poll(&pfd
, 1, timeout
);
337 if (errno
== EAGAIN
|| errno
== EINTR
) {
342 dnet_log(st
->n
, DNET_LOG_ERROR
, "Failed to wait for descriptor: err: %d, socket: %d.\n",
353 if (pfd
.revents
& (POLLRDHUP
| POLLERR
| POLLHUP
| POLLNVAL
)) {
354 dnet_log(st
->n
, DNET_LOG_DEBUG
, "Connection reset by peer: sock: %d, revents: %x.\n",
355 st
->read_s
, pfd
.revents
);
360 if (pfd
.revents
& events
) {
365 dnet_log(st
->n
, DNET_LOG_ERROR
, "Socket reported error: sock: %d, revents: %x.\n",
366 st
->read_s
, pfd
.revents
);
369 if (st
->n
->need_exit
|| st
->need_exit
) {
370 dnet_log(st
->n
, DNET_LOG_ERROR
, "Need to exit.\n");
377 ssize_t
dnet_send_nolock(struct dnet_net_state
*st
, void *data
, uint64_t size
)
380 struct dnet_node
*n
= st
->n
;
383 err
= send(st
->write_s
, data
, size
, 0);
387 dnet_log_err(n
, "Failed to send packet: size: %llu, socket: %d",
388 (unsigned long long)size
, st
->write_s
);
393 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has dropped the connection: socket: %d.\n", dnet_state_dump_addr(st
), st
->write_s
);
400 st
->send_offset
+= err
;
408 ssize_t
dnet_send(struct dnet_net_state
*st
, void *data
, uint64_t size
)
410 struct dnet_io_req r
;
412 memset(&r
, 0, sizeof(r
));
417 return dnet_io_req_queue(st
, &r
);
420 ssize_t
dnet_send_data(struct dnet_net_state
*st
, void *header
, uint64_t hsize
, void *data
, uint64_t dsize
)
422 struct dnet_io_req r
;
424 memset(&r
, 0, sizeof(r
));
431 return dnet_io_req_queue(st
, &r
);
434 static ssize_t
dnet_send_fd_nolock(struct dnet_net_state
*st
, int fd
, uint64_t offset
, uint64_t dsize
)
439 err
= dnet_sendfile(st
, fd
, &offset
, dsize
);
444 dnet_log_err(st
->n
, "Looks like truncated file: fd: %d, offset: %llu, size: %llu.\n",
445 fd
, (unsigned long long)offset
, (unsigned long long)dsize
);
450 st
->send_offset
+= err
;
457 ssize_t
dnet_send_fd(struct dnet_net_state
*st
, void *header
, uint64_t hsize
,
458 int fd
, uint64_t offset
, uint64_t fsize
, int close_on_exit
)
460 struct dnet_io_req r
;
462 memset(&r
, 0, sizeof(r
));
466 r
.close_on_exit
= close_on_exit
;
467 r
.local_offset
= offset
;
470 return dnet_io_req_queue(st
, &r
);
473 static void dnet_trans_timestamp(struct dnet_net_state
*st
, struct dnet_trans
*t
)
475 gettimeofday(&t
->time
, NULL
);
476 t
->time
.tv_sec
+= st
->n
->wait_ts
.tv_sec
;
478 list_move_tail(&t
->trans_list_entry
, &st
->trans_list
);
481 int dnet_trans_send(struct dnet_trans
*t
, struct dnet_io_req
*req
)
483 struct dnet_net_state
*st
= req
->st
;
488 pthread_mutex_lock(&st
->trans_lock
);
489 err
= dnet_trans_insert_nolock(&st
->trans_root
, t
);
491 dnet_trans_timestamp(st
, t
);
492 pthread_mutex_unlock(&st
->trans_lock
);
496 err
= dnet_io_req_queue(st
, req
);
504 dnet_trans_remove(t
);
510 int dnet_recv(struct dnet_net_state
*st
, void *data
, unsigned int size
)
513 int wait
= st
->n
->wait_ts
.tv_sec
;
516 err
= dnet_wait(st
, POLLIN
, 1000);
518 if (err
== -EAGAIN
) {
527 err
= recv(st
->read_s
, data
, size
, 0);
529 dnet_log_err(st
->n
, "Failed to recv packet: size: %u", size
);
534 dnet_log(st
->n
, DNET_LOG_ERROR
, "dnet_recv: peer %s has disconnected.\n",
535 dnet_server_convert_dnet_addr(&st
->addr
));
541 wait
= st
->n
->wait_ts
.tv_sec
;
547 static struct dnet_trans
*dnet_trans_new(struct dnet_net_state
*st
)
549 struct dnet_trans
*t
;
551 t
= dnet_trans_alloc(st
->n
, 0);
561 int dnet_add_reconnect_state(struct dnet_node
*n
, struct dnet_addr
*addr
, unsigned int join_state
)
563 struct dnet_addr_storage
*a
, *it
;
566 if (!join_state
|| n
->need_exit
) {
568 dnet_log(n
, DNET_LOG_INFO
, "Do not add reconnection addr: %s, join state: %x.\n",
569 dnet_server_convert_dnet_addr(addr
), join_state
);
573 a
= malloc(sizeof(struct dnet_addr_storage
));
578 memset(a
, 0, sizeof(struct dnet_addr_storage
));
580 memcpy(&a
->addr
, addr
, sizeof(struct dnet_addr
));
581 a
->__join_state
= join_state
;
583 pthread_mutex_lock(&n
->reconnect_lock
);
584 list_for_each_entry(it
, &n
->reconnect_list
, reconnect_entry
) {
585 if (!memcmp(&it
->addr
, &a
->addr
, sizeof(struct dnet_addr
))) {
586 dnet_log(n
, DNET_LOG_INFO
, "Address already exists in reconnection array: addr: %s, join state: %x.\n",
587 dnet_server_convert_dnet_addr(&a
->addr
), join_state
);
594 dnet_log(n
, DNET_LOG_INFO
, "Added reconnection addr: %s, join state: %x.\n",
595 dnet_server_convert_dnet_addr(&a
->addr
), join_state
);
596 list_add_tail(&a
->reconnect_entry
, &n
->reconnect_list
);
598 pthread_mutex_unlock(&n
->reconnect_lock
);
607 static int dnet_trans_complete_forward(struct dnet_net_state
*state __unused
, struct dnet_cmd
*cmd
, void *priv
)
609 struct dnet_trans
*t
= priv
;
610 struct dnet_net_state
*orig
= t
->orig
;
613 if (!is_trans_destroyed(state
, cmd
)) {
614 uint64_t size
= cmd
->size
;
616 cmd
->trans
= t
->rcv_trans
| DNET_TRANS_REPLY
;
618 dnet_convert_cmd(cmd
);
620 err
= dnet_send_data(orig
, cmd
, sizeof(struct dnet_cmd
), cmd
+ 1, size
);
626 static int dnet_trans_forward(struct dnet_trans
*t
, struct dnet_io_req
*r
,
627 struct dnet_net_state
*orig
, struct dnet_net_state
*forward
)
629 struct dnet_cmd
*cmd
= r
->header
;
631 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
633 t
->rcv_trans
= cmd
->trans
;
634 cmd
->trans
= t
->cmd
.trans
= t
->trans
= atomic_inc(&orig
->n
->trans
);
636 dnet_convert_cmd(cmd
);
638 t
->command
= cmd
->cmd
;
639 t
->complete
= dnet_trans_complete_forward
;
642 t
->orig
= dnet_state_get(orig
);
643 t
->st
= dnet_state_get(forward
);
651 dnet_log(orig
->n
, DNET_LOG_INFO
, "%s: forwarding %s trans: %s -> %s, trans: %llu -> %llu\n",
652 dnet_dump_id(&t
->cmd
.id
), dnet_cmd_string(t
->command
),
653 dnet_server_convert_dnet_addr_raw(&orig
->addr
, saddr
, sizeof(saddr
)),
654 dnet_server_convert_dnet_addr_raw(&forward
->addr
, daddr
, sizeof(daddr
)),
655 (unsigned long long)t
->rcv_trans
, (unsigned long long)t
->trans
);
658 return dnet_trans_send(t
, r
);
661 int dnet_process_recv(struct dnet_net_state
*st
, struct dnet_io_req
*r
)
664 struct dnet_trans
*t
= NULL
;
665 struct dnet_node
*n
= st
->n
;
666 struct dnet_net_state
*forward_state
;
667 struct dnet_cmd
*cmd
= r
->header
;
669 if (cmd
->trans
& DNET_TRANS_REPLY
) {
670 uint64_t tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
672 pthread_mutex_lock(&st
->trans_lock
);
673 t
= dnet_trans_search(&st
->trans_root
, tid
);
675 if (!(cmd
->flags
& DNET_FLAGS_MORE
)) {
676 dnet_trans_remove_nolock(&st
->trans_root
, t
);
677 list_del_init(&t
->trans_list_entry
);
679 dnet_trans_timestamp(st
, t
);
681 pthread_mutex_unlock(&st
->trans_lock
);
684 dnet_log(n
, DNET_LOG_ERROR
, "%s: could not find transaction for reply: trans %llu.\n",
685 dnet_dump_id(&cmd
->id
), (unsigned long long)tid
);
691 t
->complete(t
->st
, cmd
, t
->priv
);
694 if (!(cmd
->flags
& DNET_FLAGS_MORE
)) {
695 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
701 forward_state
= dnet_state_get_first(n
, &cmd
->id
);
702 if (!forward_state
|| forward_state
== st
|| forward_state
== n
->st
||
703 (st
->rcv_cmd
.flags
& DNET_FLAGS_DIRECT
)) {
704 dnet_state_put(forward_state
);
706 err
= dnet_process_cmd_raw(st
, cmd
, r
->data
);
710 t
= dnet_trans_new(st
);
713 goto err_out_put_forward
;
716 err
= dnet_trans_forward(t
, r
, st
, forward_state
);
718 goto err_out_destroy
;
720 dnet_state_put(forward_state
);
722 err
= dnet_process_cmd_raw(st
, cmd
, r
->data
);
730 dnet_state_put(forward_state
);
733 dnet_log(n
, DNET_LOG_ERROR
, "%s: error during received transaction processing: trans %llu, reply: %d, error: %d.\n",
734 dnet_dump_id(&t
->cmd
.id
), (t
->cmd
.trans
& ~DNET_TRANS_REPLY
),
735 !!(t
->cmd
.trans
& DNET_TRANS_REPLY
), err
);
739 void dnet_state_remove_nolock(struct dnet_net_state
*st
)
741 list_del_init(&st
->state_entry
);
742 list_del_init(&st
->storage_state_entry
);
743 dnet_idc_destroy_nolock(st
);
746 static void dnet_state_remove(struct dnet_net_state
*st
)
748 struct dnet_node
*n
= st
->n
;
750 pthread_mutex_lock(&n
->state_lock
);
751 dnet_state_remove_nolock(st
);
752 pthread_mutex_unlock(&n
->state_lock
);
755 void dnet_state_reset(struct dnet_net_state
*st
)
757 dnet_state_remove(st
);
759 pthread_mutex_lock(&st
->send_lock
);
761 st
->need_exit
= -ECONNRESET
;
762 dnet_unschedule_send(st
);
763 pthread_mutex_unlock(&st
->send_lock
);
765 dnet_unschedule_recv(st
);
767 dnet_add_reconnect_state(st
->n
, &st
->addr
, st
->__join_state
);
769 dnet_state_clean(st
);
773 void dnet_sock_close(int s
)
779 void dnet_set_sockopt(int s
)
785 setsockopt(s
, SOL_SOCKET
, SO_KEEPALIVE
, &opt
, 4);
788 setsockopt(s
, IPPROTO_TCP
, TCP_KEEPCNT
, &opt
, 4);
790 setsockopt(s
, IPPROTO_TCP
, TCP_KEEPIDLE
, &opt
, 4);
792 setsockopt(s
, IPPROTO_TCP
, TCP_KEEPINTVL
, &opt
, 4);
797 setsockopt(s
, SOL_SOCKET
, SO_LINGER
, &l
, sizeof(l
));
799 fcntl(s
, F_SETFD
, FD_CLOEXEC
);
800 fcntl(s
, F_SETFL
, O_NONBLOCK
);
803 int dnet_setup_control_nolock(struct dnet_net_state
*st
)
805 struct dnet_node
*n
= st
->n
;
806 struct dnet_io
*io
= n
->io
;
809 if (st
->epoll_fd
== -1) {
810 pos
= io
->net_thread_pos
;
811 if (++io
->net_thread_pos
>= io
->net_thread_num
)
812 io
->net_thread_pos
= 0;
813 st
->epoll_fd
= io
->net
[pos
].epoll_fd
;
815 err
= dnet_schedule_recv(st
);
817 goto err_out_unschedule
;
823 dnet_unschedule_send(st
);
824 dnet_unschedule_recv(st
);
827 list_del_init(&st
->storage_state_entry
);
831 static int dnet_auth_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv __unused
)
838 /* this means this callback at least has state and cmd */
839 if (!is_trans_destroyed(state
, cmd
)) {
842 if (cmd
->status
== 0) {
843 dnet_log(n
, DNET_LOG_INFO
, "%s: authentication request suceeded\n", dnet_state_dump_addr(state
));
847 dnet_log(n
, DNET_LOG_ERROR
, "%s: authentication request failed: %d\n", dnet_state_dump_addr(state
), cmd
->status
);
849 state
->__join_state
= 0;
850 dnet_state_reset(state
);
856 static int dnet_auth_send(struct dnet_net_state
*st
)
858 struct dnet_node
*n
= st
->n
;
859 struct dnet_trans_control ctl
;
862 memset(&a
, 0, sizeof(struct dnet_auth
));
864 memcpy(a
.cookie
, n
->cookie
, DNET_AUTH_COOKIE_SIZE
);
865 dnet_convert_auth(&a
);
867 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
869 ctl
.cmd
= DNET_CMD_AUTH
;
870 ctl
.cflags
= DNET_FLAGS_DIRECT
| DNET_FLAGS_NEED_ACK
;
871 ctl
.size
= sizeof(struct dnet_auth
);
874 ctl
.complete
= dnet_auth_complete
;
876 return dnet_trans_alloc_send_state(st
, &ctl
);
879 struct dnet_net_state
*dnet_state_create(struct dnet_node
*n
,
880 int group_id
, struct dnet_raw_id
*ids
, int id_num
,
881 struct dnet_addr
*addr
, int s
, int *errp
, int join
,
882 int (* process
)(struct dnet_net_state
*st
, struct epoll_event
*ev
))
885 struct dnet_net_state
*st
;
888 st
= dnet_state_search_by_addr(n
, addr
);
896 st
= malloc(sizeof(struct dnet_net_state
));
900 memset(st
, 0, sizeof(struct dnet_net_state
));
903 st
->write_s
= dup(s
);
904 if (st
->write_s
< 0) {
906 dnet_log_err(n
, "%s: failed to duplicate socket", dnet_server_convert_dnet_addr(addr
));
910 fcntl(st
->write_s
, F_SETFD
, FD_CLOEXEC
);
914 st
->process
= process
;
917 st
->weight
= DNET_STATE_MAX_WEIGHT
/ 2;
918 st
->median_read_time
= 1000; /* useconds for start */
920 INIT_LIST_HEAD(&st
->state_entry
);
921 INIT_LIST_HEAD(&st
->storage_state_entry
);
923 st
->trans_root
= RB_ROOT
;
924 INIT_LIST_HEAD(&st
->trans_list
);
928 err
= pthread_mutex_init(&st
->trans_lock
, NULL
);
931 dnet_log_err(n
, "Failed to initialize transaction mutex: %d", err
);
932 goto err_out_dup_destroy
;
935 INIT_LIST_HEAD(&st
->send_list
);
936 err
= pthread_mutex_init(&st
->send_lock
, NULL
);
939 dnet_log_err(n
, "Failed to initialize send mutex: %d", err
);
940 goto err_out_trans_destroy
;
943 atomic_init(&st
->refcnt
, 1);
945 memcpy(&st
->addr
, addr
, sizeof(struct dnet_addr
));
947 dnet_schedule_command(st
);
948 st
->__join_state
= join
;
950 if (n
->client_prio
) {
951 err
= setsockopt(st
->read_s
, IPPROTO_IP
, IP_TOS
, &n
->client_prio
, 4);
954 dnet_log_err(n
, "could not set read client prio %d", n
->client_prio
);
956 err
= setsockopt(st
->write_s
, IPPROTO_IP
, IP_TOS
, &n
->client_prio
, 4);
959 dnet_log_err(n
, "could not set write client prio %d", n
->client_prio
);
963 dnet_log(n
, DNET_LOG_INFO
, "%s: client net TOS value set to %d\n",
964 dnet_server_convert_dnet_addr(addr
), n
->client_prio
);
969 * it is possible that state can be removed after inserted into route table,
970 * so we should grab a reference here and drop it after we are done
975 err
= dnet_idc_create(st
, group_id
, ids
, id_num
);
977 goto err_out_send_destroy
;
979 if ((st
->__join_state
== DNET_JOIN
) && n
->st
) {
980 pthread_mutex_lock(&n
->state_lock
);
981 err
= dnet_state_join_nolock(st
);
982 pthread_mutex_unlock(&n
->state_lock
);
984 err
= dnet_auth_send(st
);
987 pthread_mutex_lock(&n
->state_lock
);
988 list_add_tail(&st
->state_entry
, &n
->empty_state_list
);
989 list_add_tail(&st
->storage_state_entry
, &n
->storage_state_list
);
991 err
= dnet_setup_control_nolock(st
);
994 pthread_mutex_unlock(&n
->state_lock
);
997 if (atomic_read(&st
->refcnt
) == 1) {
1010 list_del_init(&st
->state_entry
);
1011 pthread_mutex_unlock(&n
->state_lock
);
1012 err_out_send_destroy
:
1014 pthread_mutex_destroy(&st
->send_lock
);
1015 err_out_trans_destroy
:
1016 pthread_mutex_destroy(&st
->trans_lock
);
1017 err_out_dup_destroy
:
1018 dnet_sock_close(st
->write_s
);
1026 dnet_log(n
, DNET_LOG_ERROR
, "%s: state already exists.\n", dnet_server_convert_dnet_addr(addr
));
1031 int dnet_state_num(struct dnet_node
*n
)
1033 struct dnet_net_state
*st
;
1034 struct dnet_group
*g
;
1037 pthread_mutex_lock(&n
->state_lock
);
1038 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1039 list_for_each_entry(st
, &g
->state_list
, state_entry
)
1042 pthread_mutex_unlock(&n
->state_lock
);
1047 static void dnet_state_send_clean(struct dnet_net_state
*st
)
1049 struct dnet_io_req
*r
, *tmp
;
1051 list_for_each_entry_safe(r
, tmp
, &st
->send_list
, req_entry
) {
1052 list_del(&r
->req_entry
);
1053 dnet_io_req_free(r
);
1057 void dnet_state_destroy(struct dnet_net_state
*st
)
1059 dnet_state_remove(st
);
1061 if (st
->read_s
>= 0) {
1062 dnet_sock_close(st
->read_s
);
1063 dnet_sock_close(st
->write_s
);
1066 dnet_state_clean(st
);
1068 dnet_state_send_clean(st
);
1070 pthread_mutex_destroy(&st
->send_lock
);
1071 pthread_mutex_destroy(&st
->trans_lock
);
1073 dnet_log(st
->n
, DNET_LOG_NOTICE
, "Freeing state %s, socket: %d/%d.\n",
1074 dnet_server_convert_dnet_addr(&st
->addr
), st
->read_s
, st
->write_s
);
1079 int dnet_send_reply(void *state
, struct dnet_cmd
*cmd
, void *odata
, unsigned int size
, int more
)
1081 struct dnet_net_state
*st
= state
;
1086 if (st
== st
->n
->st
)
1089 c
= malloc(sizeof(struct dnet_cmd
) + size
);
1093 memset(c
, 0, sizeof(struct dnet_cmd
) + size
);
1098 if ((cmd
->flags
& DNET_FLAGS_NEED_ACK
) || more
)
1099 c
->flags
|= DNET_FLAGS_MORE
;
1102 c
->trans
|= DNET_TRANS_REPLY
;
1105 memcpy(data
, odata
, size
);
1107 dnet_log(st
->n
, DNET_LOG_NOTICE
, "%s: %s: reply: size: %u, cflags: %llx.\n",
1108 dnet_dump_id(&cmd
->id
), dnet_cmd_string(cmd
->cmd
), size
, (unsigned long long)c
->flags
);
1110 dnet_convert_cmd(c
);
1112 err
= dnet_send(st
, c
, sizeof(struct dnet_cmd
) + size
);
1118 int dnet_send_request(struct dnet_net_state
*st
, struct dnet_io_req
*r
)
1122 size_t offset
= st
->send_offset
;
1124 /* Use TCP_CORK to send headers and packet body in one piece */
1126 setsockopt(st
->write_s
, IPPROTO_TCP
, TCP_CORK
, &cork
, 4);
1128 if (r
->hsize
&& r
->header
&& st
->send_offset
< r
->hsize
) {
1129 err
= dnet_send_nolock(st
, r
->header
+ offset
, r
->hsize
- offset
);
1134 if (r
->dsize
&& r
->data
&& st
->send_offset
< (r
->dsize
+ r
->hsize
)) {
1135 offset
= st
->send_offset
- r
->hsize
;
1136 err
= dnet_send_nolock(st
, r
->data
+ offset
, r
->dsize
- offset
);
1141 if (r
->fd
>= 0 && r
->fsize
&& st
->send_offset
< (r
->dsize
+ r
->hsize
+ r
->fsize
)) {
1142 offset
= st
->send_offset
- r
->dsize
- r
->hsize
;
1143 err
= dnet_send_fd_nolock(st
, r
->fd
, r
->local_offset
+ offset
, r
->fsize
- offset
);
1148 if (r
->hsize
> sizeof(struct dnet_cmd
)) {
1149 struct dnet_cmd
*cmd
= r
->header
;
1150 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
1152 dnet_log(st
->n
, DNET_LOG_DEBUG
, "%s: %s: SENT %s cmd: %s: cmd-size: %llu, nonblocking: %d\n",
1153 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
),
1154 nonblocking
? "nonblocking" : "blocking",
1155 dnet_cmd_string(cmd
->cmd
),
1156 (unsigned long long)cmd
->size
, nonblocking
);
1160 if (st
->send_offset
== (r
->dsize
+ r
->hsize
+ r
->fsize
)) {
1161 pthread_mutex_lock(&st
->send_lock
);
1162 list_del(&r
->req_entry
);
1163 pthread_mutex_unlock(&st
->send_lock
);
1165 dnet_io_req_free(r
);
1166 st
->send_offset
= 0;
1169 if (err
&& err
!= -EAGAIN
) {
1170 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: setting send need_exit to %d\n", dnet_state_dump_addr(st
), err
);
1171 st
->need_exit
= err
;
1175 setsockopt(st
->write_s
, IPPROTO_TCP
, TCP_CORK
, &cork
, 4);
1180 int dnet_parse_addr(char *addr
, struct dnet_config
*cfg
)
1184 fam
= strrchr(addr
, DNET_CONF_ADDR_DELIM
);
1186 goto err_out_print_wrong_param
;
1189 goto err_out_print_wrong_param
;
1191 cfg
->family
= atoi(fam
);
1193 port
= strrchr(addr
, DNET_CONF_ADDR_DELIM
);
1195 goto err_out_print_wrong_param
;
1198 goto err_out_print_wrong_param
;
1200 memset(cfg
->addr
, 0, sizeof(cfg
->addr
));
1201 memset(cfg
->port
, 0, sizeof(cfg
->port
));
1203 snprintf(cfg
->addr
, sizeof(cfg
->addr
), "%s", addr
);
1204 snprintf(cfg
->port
, sizeof(cfg
->port
), "%s", port
);
1206 if (!strcmp(addr
, "hostname")) {
1209 err
= gethostname(cfg
->addr
, sizeof(cfg
->addr
));
1212 fprintf(stderr
, "Could not get hostname: %s %d\n", strerror(-err
), err
);
1213 goto err_out_print_wrong_param
;
1219 err_out_print_wrong_param
:
1220 fprintf(stderr
, "Wrong address parameter '%s', should be 'addr%cport%cfamily'.\n",
1221 addr
, DNET_CONF_ADDR_DELIM
, DNET_CONF_ADDR_DELIM
);