2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string
[] = {
28 [DNET_WORK_IO_MODE_BLOCKING
] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING
] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode
)
34 if (mode
< 0 || mode
>= (int)ARRAY_SIZE(dnet_work_io_mode_string
))
37 return dnet_work_io_mode_string
[mode
];
40 static void dnet_work_pool_cleanup(struct dnet_work_pool
*pool
)
42 struct dnet_io_req
*r
, *tmp
;
43 struct dnet_work_io
*wio
, *wio_tmp
;
45 list_for_each_entry_safe(wio
, wio_tmp
, &pool
->wio_list
, wio_entry
) {
46 pthread_join(wio
->tid
, NULL
);
47 list_del(&wio
->wio_entry
);
52 list_for_each_entry_safe(r
, tmp
, &pool
->list
, req_entry
) {
53 list_del(&r
->req_entry
);
57 pthread_cond_destroy(&pool
->wait
);
58 pthread_mutex_destroy(&pool
->lock
);
62 static int dnet_work_pool_grow(struct dnet_node
*n
, struct dnet_work_pool
*pool
, int num
, void *(* process
)(void *))
65 struct dnet_work_io
*wio
, *tmp
;
67 pthread_mutex_lock(&pool
->lock
);
69 for (i
= 0; i
< num
; ++i
) {
70 wio
= malloc(sizeof(struct dnet_work_io
));
73 goto err_out_io_threads
;
76 wio
->thread_index
= i
;
78 list_add_tail(&wio
->wio_entry
, &pool
->wio_list
);
80 err
= pthread_create(&wio
->tid
, NULL
, process
, wio
);
83 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create IO thread: %d\n", err
);
84 goto err_out_io_threads
;
88 dnet_log(n
, DNET_LOG_INFO
, "Grew %s pool by: %d -> %d IO threads\n",
89 dnet_work_io_mode_str(pool
->mode
), pool
->num
, pool
->num
+ num
);
91 atomic_add(&pool
->avail
, num
);
93 pthread_mutex_unlock(&pool
->lock
);
98 list_for_each_entry_safe(wio
, tmp
, &pool
->wio_list
, wio_entry
) {
99 pthread_join(wio
->tid
, NULL
);
100 list_del(&wio
->wio_entry
);
104 pthread_mutex_unlock(&pool
->lock
);
109 static struct dnet_work_pool
*dnet_work_pool_alloc(struct dnet_node
*n
, int num
, int mode
, void *(* process
)(void *))
111 struct dnet_work_pool
*pool
;
114 pool
= malloc(sizeof(struct dnet_work_pool
));
120 memset(pool
, 0, sizeof(struct dnet_work_pool
));
123 atomic_set(&pool
->avail
, 0);
126 INIT_LIST_HEAD(&pool
->list
);
127 INIT_LIST_HEAD(&pool
->wio_list
);
129 err
= pthread_mutex_init(&pool
->lock
, NULL
);
135 err
= pthread_cond_init(&pool
->wait
, NULL
);
138 goto err_out_mutex_destroy
;
141 err
= dnet_work_pool_grow(n
, pool
, num
, process
);
143 goto err_out_cond_destroy
;
147 err_out_cond_destroy
:
148 pthread_cond_destroy(&pool
->wait
);
149 err_out_mutex_destroy
:
150 pthread_mutex_destroy(&pool
->lock
);
157 static void *dnet_io_process(void *data_
);
158 static void dnet_schedule_io(struct dnet_node
*n
, struct dnet_io_req
*r
)
160 struct dnet_io
*io
= n
->io
;
161 struct dnet_cmd
*cmd
= r
->header
;
162 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
163 struct dnet_work_pool
*pool
= io
->recv_pool
;
166 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
167 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
),
168 (unsigned long long)cmd
->size
, nonblocking
);
169 } else if ((cmd
->size
== 0) && !(cmd
->flags
& DNET_FLAGS_MORE
) && (cmd
->trans
& DNET_TRANS_REPLY
)) {
170 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
171 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
);
173 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
174 int reply
= !!(cmd
->trans
& DNET_TRANS_REPLY
);
176 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
177 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
,
178 (unsigned long long)cmd
->size
, (unsigned long long)cmd
->flags
, tid
, reply
);
183 pool
= io
->recv_pool_nb
;
185 #define cmd_is_exec_match(__cmd) (((__cmd)->cmd == DNET_CMD_EXEC) && ((__cmd)->size >= sizeof(struct sph)) && !((__cmd)->trans & DNET_TRANS_REPLY))
187 if (!list_empty(&pool
->list
) && cmd_is_exec_match(cmd
)) {
188 int pool_has_blocked_sph
= 0;
189 struct dnet_io_req
*tmp
;
191 int edge_num
= pool
->num
/ 4 + 1;
193 pthread_mutex_lock(&pool
->lock
);
194 list_for_each_entry(tmp
, &pool
->list
, req_entry
) {
195 struct dnet_cmd
*tmp_cmd
= tmp
->header
;
196 unsigned long long tid
= tmp_cmd
->trans
& ~DNET_TRANS_REPLY
;
197 int reply
= !!(tmp_cmd
->trans
& DNET_TRANS_REPLY
);
198 unsigned long long sph_flags
= 0;
201 if (cmd_is_exec_match(tmp_cmd
)) {
202 sph
= (struct sph
*)tmp
->data
;
203 sph_flags
= sph
->flags
;
208 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: pool-grow: %s: cmd-size: %llu, cflags: %llx, "
209 "trans: %lld, reply: %d, sph-flags: %llx (match: %d), avail: %d\n",
210 dnet_state_dump_addr(tmp
->st
), dnet_dump_id(tmp
->header
), dnet_cmd_string(tmp_cmd
->cmd
),
211 (unsigned long long)tmp_cmd
->size
, (unsigned long long)tmp_cmd
->flags
,
212 tid
, reply
, sph_flags
, sph_match
, atomic_read(&pool
->avail
));
214 if (cmd_is_exec_match(tmp_cmd
)) {
215 sph
= (struct sph
*)tmp
->data
;
216 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
217 pool_has_blocked_sph
= 1;
222 pthread_mutex_unlock(&pool
->lock
);
224 sph
= (struct sph
*)r
->data
;
225 if ((sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) && pool_has_blocked_sph
&& (atomic_read(&pool
->avail
) < edge_num
)) {
226 dnet_work_pool_grow(n
, pool
, edge_num
, dnet_io_process
);
230 pthread_mutex_lock(&pool
->lock
);
231 list_add_tail(&r
->req_entry
, &pool
->list
);
232 pthread_cond_broadcast(&pool
->wait
);
233 pthread_mutex_unlock(&pool
->lock
);
237 void dnet_schedule_command(struct dnet_net_state
*st
)
239 st
->rcv_flags
= DNET_IO_CMD
;
243 struct dnet_cmd
*c
= &st
->rcv_cmd
;
244 unsigned long long tid
= c
->trans
& ~DNET_TRANS_REPLY
;
245 dnet_log(st
->n
, DNET_LOG_DEBUG
, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
246 (unsigned long long)c
->size
, tid
, tid
!= c
->trans
, st
->rcv_data
);
252 st
->rcv_end
= sizeof(struct dnet_cmd
);
256 static int dnet_process_recv_single(struct dnet_net_state
*st
)
258 struct dnet_node
*n
= st
->n
;
259 struct dnet_io_req
*r
;
266 * Reading command first.
268 if (st
->rcv_flags
& DNET_IO_CMD
)
272 data
+= st
->rcv_offset
;
273 size
= st
->rcv_end
- st
->rcv_offset
;
276 err
= recv(st
->read_s
, data
, size
, 0);
279 if (errno
!= EAGAIN
&& errno
!= EINTR
) {
281 dnet_log_err(n
, "failed to receive data, socket: %d", st
->read_s
);
289 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has disconnected.\n",
290 dnet_server_convert_dnet_addr(&st
->addr
));
295 st
->rcv_offset
+= err
;
298 if (st
->rcv_offset
!= st
->rcv_end
)
301 if (st
->rcv_flags
& DNET_IO_CMD
) {
302 unsigned long long tid
;
303 struct dnet_cmd
*c
= &st
->rcv_cmd
;
307 tid
= c
->trans
& ~DNET_TRANS_REPLY
;
309 dnet_log(n
, DNET_LOG_DEBUG
, "%s: received trans: %llu / %llx, "
310 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
311 dnet_dump_id(&c
->id
), tid
, (unsigned long long)c
->trans
,
312 !!(c
->trans
& DNET_TRANS_REPLY
),
313 (unsigned long long)c
->size
, (unsigned long long)c
->flags
, c
->status
);
315 r
= malloc(c
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_req
));
320 memset(r
, 0, sizeof(struct dnet_io_req
));
323 r
->hsize
= sizeof(struct dnet_cmd
);
324 memcpy(r
->header
, &st
->rcv_cmd
, sizeof(struct dnet_cmd
));
327 st
->rcv_offset
= sizeof(struct dnet_io_req
) + sizeof(struct dnet_cmd
);
328 st
->rcv_end
= st
->rcv_offset
+ c
->size
;
329 st
->rcv_flags
&= ~DNET_IO_CMD
;
332 r
->data
= r
->header
+ sizeof(struct dnet_cmd
);
336 * We read the command header, now get the data.
345 dnet_schedule_command(st
);
347 r
->st
= dnet_state_get(st
);
349 dnet_schedule_io(n
, r
);
353 if (err
!= -EAGAIN
&& err
!= -EINTR
)
354 dnet_schedule_command(st
);
359 int dnet_state_accept_process(struct dnet_net_state
*orig
, struct epoll_event
*ev __unused
)
361 struct dnet_node
*n
= orig
->n
;
363 struct dnet_addr addr
;
364 struct dnet_net_state
*st
;
366 memset(&addr
, 0, sizeof(addr
));
368 addr
.addr_len
= sizeof(addr
.addr
);
369 cs
= accept(orig
->read_s
, (struct sockaddr
*)&addr
.addr
, &addr
.addr_len
);
373 dnet_log_err(n
, "failed to accept new client at %s", dnet_state_dump_addr(orig
));
377 dnet_set_sockopt(cs
);
379 st
= dnet_state_create(n
, 0, NULL
, 0, &addr
, cs
, &err
, 0, dnet_state_net_process
);
381 dnet_log(n
, DNET_LOG_ERROR
, "%s: Failed to create state for accepted client: %s [%d]\n",
382 dnet_server_convert_dnet_addr(&addr
), strerror(-err
), -err
);
387 dnet_log(n
, DNET_LOG_INFO
, "Accepted client %s, socket: %d.\n",
388 dnet_server_convert_dnet_addr(&addr
), cs
);
391 /* socket is closed in dnet_state_create() */
396 void dnet_unschedule_send(struct dnet_net_state
*st
)
398 struct epoll_event ev
;
400 ev
.events
= EPOLLOUT
;
403 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->write_s
, &ev
);
406 void dnet_unschedule_recv(struct dnet_net_state
*st
)
408 struct epoll_event ev
;
413 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->read_s
, &ev
);
416 static int dnet_process_send_single(struct dnet_net_state
*st
)
418 struct dnet_io_req
*r
= NULL
;
424 pthread_mutex_lock(&st
->send_lock
);
425 if (!list_empty(&st
->send_list
)) {
426 r
= list_first_entry(&st
->send_list
, struct dnet_io_req
, req_entry
);
428 dnet_unschedule_send(st
);
430 pthread_mutex_unlock(&st
->send_lock
);
437 err
= dnet_send_request(st
, r
);
446 static int dnet_schedule_network_io(struct dnet_net_state
*st
, int send
)
448 struct epoll_event ev
;
452 ev
.events
= EPOLLOUT
;
460 err
= epoll_ctl(st
->epoll_fd
, EPOLL_CTL_ADD
, fd
, &ev
);
464 if (err
== -EEXIST
) {
467 dnet_log_err(st
->n
, "%s: failed to add %s event", dnet_state_dump_addr(st
), send
? "SEND" : "RECV");
474 int dnet_schedule_send(struct dnet_net_state
*st
)
476 return dnet_schedule_network_io(st
, 1);
479 int dnet_schedule_recv(struct dnet_net_state
*st
)
481 return dnet_schedule_network_io(st
, 0);
484 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
)
486 int err
= -ECONNRESET
;
488 if (ev
->events
& EPOLLIN
) {
489 err
= dnet_process_recv_single(st
);
490 if (err
&& (err
!= -EAGAIN
))
493 if (ev
->events
& EPOLLOUT
) {
494 err
= dnet_process_send_single(st
);
495 if (err
&& (err
!= -EAGAIN
))
499 if (ev
->events
& (EPOLLHUP
| EPOLLERR
)) {
500 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: received error event mask %x\n", dnet_state_dump_addr(st
), ev
->events
);
507 static void *dnet_io_process_network(void *data_
)
509 struct dnet_net_io
*nio
= data_
;
510 struct dnet_node
*n
= nio
->n
;
511 struct dnet_net_state
*st
;
512 struct epoll_event ev
;
514 struct dnet_trans
*t
, *tmp
;
516 struct list_head head
;
518 dnet_set_name("net_pool");
520 while (!n
->need_exit
) {
521 err
= epoll_wait(nio
->epoll_fd
, &ev
, 1, 1000);
528 if (err
== -EAGAIN
|| err
== -EINTR
)
531 dnet_log_err(n
, "Failed to wait for IO fds");
537 st
->epoll_fd
= nio
->epoll_fd
;
541 err
= st
->process(st
, &ev
);
545 if (err
== -EAGAIN
&& st
->stall
< DNET_DEFAULT_STALL_TRANSACTIONS
)
548 if (err
< 0 || st
->stall
>= DNET_DEFAULT_STALL_TRANSACTIONS
) {
549 dnet_state_reset(st
);
558 gettimeofday(&tv
, NULL
);
560 INIT_LIST_HEAD(&head
);
562 pthread_mutex_lock(&st
->trans_lock
);
563 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_list_entry
) {
564 if (t
->time
.tv_sec
>= tv
.tv_sec
)
567 dnet_trans_remove_nolock(&st
->trans_root
, t
);
568 list_move(&t
->trans_list_entry
, &head
);
570 pthread_mutex_unlock(&st
->trans_lock
);
572 list_for_each_entry_safe(t
, tmp
, &head
, trans_list_entry
) {
573 list_del_init(&t
->trans_list_entry
);
577 t
->cmd
.status
= -ETIMEDOUT
;
579 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: destructing trans: %llu on TIMEOUT\n",
580 dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
583 t
->complete(st
, &t
->cmd
, t
->priv
);
589 return &n
->need_exit
;
592 static void dnet_io_cleanup_states(struct dnet_node
*n
)
594 struct dnet_net_state
*st
, *tmp
;
596 list_for_each_entry_safe(st
, tmp
, &n
->storage_state_list
, storage_state_entry
) {
597 dnet_state_reset(st
);
601 struct dnet_io_process_data
{
606 static void *dnet_io_process(void *data_
)
608 struct dnet_work_io
*wio
= data_
;
609 struct dnet_work_pool
*pool
= wio
->pool
;
610 struct dnet_node
*n
= pool
->n
;
611 struct dnet_net_state
*st
;
614 struct dnet_io_req
*r
;
617 dnet_set_name("io_pool");
619 while (!n
->need_exit
) {
623 gettimeofday(&tv
, NULL
);
624 ts
.tv_sec
= tv
.tv_sec
+ 1;
625 ts
.tv_nsec
= tv
.tv_usec
* 1000;
627 pthread_mutex_lock(&pool
->lock
);
629 if (!list_empty(&pool
->list
)) {
630 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
632 err
= pthread_cond_timedwait(&pool
->wait
, &pool
->lock
, &ts
);
633 if (!list_empty(&pool
->list
)) {
634 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
640 list_del_init(&r
->req_entry
);
641 atomic_dec(&pool
->avail
);
643 pthread_mutex_unlock(&pool
->lock
);
650 dnet_log(n
, DNET_LOG_DEBUG
, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
651 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
), r
, r
->hsize
, r
->dsize
, dnet_work_io_mode_str(pool
->mode
));
653 err
= dnet_process_recv(st
, r
);
658 atomic_inc(&pool
->avail
);
664 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
668 int io_size
= sizeof(struct dnet_io
) + sizeof(struct dnet_net_io
) * cfg
->net_thread_num
;
670 io
= malloc(io_size
);
676 memset(io
, 0, io_size
);
678 io
->net_thread_num
= cfg
->net_thread_num
;
679 io
->net_thread_pos
= 0;
680 io
->net
= (struct dnet_net_io
*)(io
+ 1);
682 io
->recv_pool
= dnet_work_pool_alloc(n
, cfg
->io_thread_num
, DNET_WORK_IO_MODE_BLOCKING
, dnet_io_process
);
683 if (!io
->recv_pool
) {
688 io
->recv_pool_nb
= dnet_work_pool_alloc(n
, cfg
->nonblocking_io_thread_num
, DNET_WORK_IO_MODE_NONBLOCKING
, dnet_io_process
);
689 if (!io
->recv_pool_nb
) {
691 goto err_out_free_recv_pool
;
694 for (i
=0; i
<io
->net_thread_num
; ++i
) {
695 struct dnet_net_io
*nio
= &io
->net
[i
];
699 nio
->epoll_fd
= epoll_create(10000);
700 if (nio
->epoll_fd
< 0) {
702 dnet_log_err(n
, "Failed to create epoll fd");
703 goto err_out_net_destroy
;
706 fcntl(nio
->epoll_fd
, F_SETFD
, FD_CLOEXEC
);
707 fcntl(nio
->epoll_fd
, F_SETFL
, O_NONBLOCK
);
709 err
= pthread_create(&nio
->tid
, NULL
, dnet_io_process_network
, nio
);
711 close(nio
->epoll_fd
);
713 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create network processing thread: %d\n", err
);
714 goto err_out_net_destroy
;
723 pthread_join(io
->net
[i
].tid
, NULL
);
724 close(io
->net
[i
].epoll_fd
);
727 dnet_work_pool_cleanup(io
->recv_pool_nb
);
728 err_out_free_recv_pool
:
729 dnet_work_pool_cleanup(io
->recv_pool
);
737 void dnet_io_exit(struct dnet_node
*n
)
739 struct dnet_io
*io
= n
->io
;
744 for (i
=0; i
<io
->net_thread_num
; ++i
) {
745 pthread_join(io
->net
[i
].tid
, NULL
);
746 close(io
->net
[i
].epoll_fd
);
749 dnet_work_pool_cleanup(io
->recv_pool_nb
);
750 dnet_work_pool_cleanup(io
->recv_pool
);
752 dnet_io_cleanup_states(n
);