2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string
[] = {
28 [DNET_WORK_IO_MODE_BLOCKING
] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING
] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode
)
34 if (mode
< 0 || mode
>= (int)ARRAY_SIZE(dnet_work_io_mode_string
))
37 return dnet_work_io_mode_string
[mode
];
40 static void dnet_work_pool_cleanup(struct dnet_work_pool
*pool
)
42 struct dnet_io_req
*r
, *tmp
;
43 struct dnet_work_io
*wio
, *wio_tmp
;
45 list_for_each_entry_safe(wio
, wio_tmp
, &pool
->wio_list
, wio_entry
) {
46 pthread_join(wio
->tid
, NULL
);
47 list_del(&wio
->wio_entry
);
52 list_for_each_entry_safe(r
, tmp
, &pool
->list
, req_entry
) {
53 list_del(&r
->req_entry
);
57 pthread_cond_destroy(&pool
->wait
);
58 pthread_mutex_destroy(&pool
->lock
);
62 static int dnet_work_pool_grow(struct dnet_node
*n
, struct dnet_work_pool
*pool
, int num
, void *(* process
)(void *))
65 struct dnet_work_io
*wio
, *tmp
;
67 pthread_mutex_lock(&pool
->lock
);
69 for (i
= 0; i
< num
; ++i
) {
70 wio
= malloc(sizeof(struct dnet_work_io
));
73 goto err_out_io_threads
;
76 wio
->thread_index
= i
;
78 list_add_tail(&wio
->wio_entry
, &pool
->wio_list
);
80 err
= pthread_create(&wio
->tid
, NULL
, process
, wio
);
83 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create IO thread: %d\n", err
);
84 goto err_out_io_threads
;
88 dnet_log(n
, DNET_LOG_INFO
, "Grew %s pool by: %d -> %d IO threads\n",
89 dnet_work_io_mode_str(pool
->mode
), pool
->num
, pool
->num
+ num
);
91 atomic_add(&pool
->avail
, num
);
93 pthread_mutex_unlock(&pool
->lock
);
98 list_for_each_entry_safe(wio
, tmp
, &pool
->wio_list
, wio_entry
) {
99 pthread_join(wio
->tid
, NULL
);
100 list_del(&wio
->wio_entry
);
104 pthread_mutex_unlock(&pool
->lock
);
109 static struct dnet_work_pool
*dnet_work_pool_alloc(struct dnet_node
*n
, int num
, int mode
, void *(* process
)(void *))
111 struct dnet_work_pool
*pool
;
114 pool
= malloc(sizeof(struct dnet_work_pool
));
120 memset(pool
, 0, sizeof(struct dnet_work_pool
));
123 atomic_set(&pool
->avail
, 0);
126 INIT_LIST_HEAD(&pool
->list
);
127 INIT_LIST_HEAD(&pool
->wio_list
);
129 err
= pthread_mutex_init(&pool
->lock
, NULL
);
135 err
= pthread_cond_init(&pool
->wait
, NULL
);
138 goto err_out_mutex_destroy
;
141 err
= dnet_work_pool_grow(n
, pool
, num
, process
);
143 goto err_out_cond_destroy
;
147 err_out_cond_destroy
:
148 pthread_cond_destroy(&pool
->wait
);
149 err_out_mutex_destroy
:
150 pthread_mutex_destroy(&pool
->lock
);
157 static void *dnet_io_process(void *data_
);
158 static void dnet_schedule_io(struct dnet_node
*n
, struct dnet_io_req
*r
)
160 struct dnet_io
*io
= n
->io
;
161 struct dnet_cmd
*cmd
= r
->header
;
162 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
163 struct dnet_work_pool
*pool
= io
->recv_pool
;
166 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
167 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
),
168 (unsigned long long)cmd
->size
, nonblocking
);
169 } else if ((cmd
->size
== 0) && !(cmd
->flags
& DNET_FLAGS_MORE
) && (cmd
->trans
& DNET_TRANS_REPLY
)) {
170 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
171 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
);
173 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
174 int reply
= !!(cmd
->trans
& DNET_TRANS_REPLY
);
176 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
177 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
,
178 (unsigned long long)cmd
->size
, (unsigned long long)cmd
->flags
, tid
, reply
);
183 pool
= io
->recv_pool_nb
;
185 #define cmd_is_exec_match(__cmd) (((__cmd)->cmd == DNET_CMD_EXEC) && ((__cmd)->size >= sizeof(struct sph)) && !((__cmd)->trans & DNET_TRANS_REPLY))
187 if (!list_empty(&pool
->list
) && cmd_is_exec_match(cmd
)) {
188 int pool_has_blocked_sph
= 0;
189 struct dnet_io_req
*tmp
;
192 pthread_mutex_lock(&pool
->lock
);
193 list_for_each_entry(tmp
, &pool
->list
, req_entry
) {
194 struct dnet_cmd
*tmp_cmd
= tmp
->header
;
195 unsigned long long tid
= tmp_cmd
->trans
& ~DNET_TRANS_REPLY
;
196 int reply
= !!(tmp_cmd
->trans
& DNET_TRANS_REPLY
);
197 unsigned long long sph_flags
= 0;
200 if (cmd_is_exec_match(tmp_cmd
)) {
201 sph
= (struct sph
*)tmp
->data
;
202 sph_flags
= sph
->flags
;
207 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: pool-grow: %s: cmd-size: %llu, cflags: %llx, "
208 "trans: %lld, reply: %d, sph-flags: %llx (match: %d), avail: %d\n",
209 dnet_state_dump_addr(tmp
->st
), dnet_dump_id(tmp
->header
), dnet_cmd_string(tmp_cmd
->cmd
),
210 (unsigned long long)tmp_cmd
->size
, (unsigned long long)tmp_cmd
->flags
,
211 tid
, reply
, sph_flags
, sph_match
, atomic_read(&pool
->avail
));
213 if (cmd_is_exec_match(tmp_cmd
)) {
214 sph
= (struct sph
*)tmp
->data
;
215 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
216 pool_has_blocked_sph
= 1;
221 pthread_mutex_unlock(&pool
->lock
);
223 sph
= (struct sph
*)r
->data
;
224 if ((sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) && pool_has_blocked_sph
&& (atomic_read(&pool
->avail
) == 0)) {
225 dnet_work_pool_grow(n
, pool
, pool
->num
/4+1, dnet_io_process
);
229 pthread_mutex_lock(&pool
->lock
);
230 list_add_tail(&r
->req_entry
, &pool
->list
);
231 pthread_cond_broadcast(&pool
->wait
);
232 pthread_mutex_unlock(&pool
->lock
);
236 void dnet_schedule_command(struct dnet_net_state
*st
)
238 st
->rcv_flags
= DNET_IO_CMD
;
242 struct dnet_cmd
*c
= &st
->rcv_cmd
;
243 unsigned long long tid
= c
->trans
& ~DNET_TRANS_REPLY
;
244 dnet_log(st
->n
, DNET_LOG_DEBUG
, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
245 (unsigned long long)c
->size
, tid
, tid
!= c
->trans
, st
->rcv_data
);
251 st
->rcv_end
= sizeof(struct dnet_cmd
);
255 static int dnet_process_recv_single(struct dnet_net_state
*st
)
257 struct dnet_node
*n
= st
->n
;
258 struct dnet_io_req
*r
;
265 * Reading command first.
267 if (st
->rcv_flags
& DNET_IO_CMD
)
271 data
+= st
->rcv_offset
;
272 size
= st
->rcv_end
- st
->rcv_offset
;
275 err
= recv(st
->read_s
, data
, size
, 0);
278 if (errno
!= EAGAIN
&& errno
!= EINTR
) {
280 dnet_log_err(n
, "failed to receive data, socket: %d", st
->read_s
);
288 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has disconnected.\n",
289 dnet_server_convert_dnet_addr(&st
->addr
));
294 st
->rcv_offset
+= err
;
297 if (st
->rcv_offset
!= st
->rcv_end
)
300 if (st
->rcv_flags
& DNET_IO_CMD
) {
301 unsigned long long tid
;
302 struct dnet_cmd
*c
= &st
->rcv_cmd
;
306 tid
= c
->trans
& ~DNET_TRANS_REPLY
;
308 dnet_log(n
, DNET_LOG_DEBUG
, "%s: received trans: %llu / %llx, "
309 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
310 dnet_dump_id(&c
->id
), tid
, (unsigned long long)c
->trans
,
311 !!(c
->trans
& DNET_TRANS_REPLY
),
312 (unsigned long long)c
->size
, (unsigned long long)c
->flags
, c
->status
);
314 r
= malloc(c
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_req
));
319 memset(r
, 0, sizeof(struct dnet_io_req
));
322 r
->hsize
= sizeof(struct dnet_cmd
);
323 memcpy(r
->header
, &st
->rcv_cmd
, sizeof(struct dnet_cmd
));
326 st
->rcv_offset
= sizeof(struct dnet_io_req
) + sizeof(struct dnet_cmd
);
327 st
->rcv_end
= st
->rcv_offset
+ c
->size
;
328 st
->rcv_flags
&= ~DNET_IO_CMD
;
331 r
->data
= r
->header
+ sizeof(struct dnet_cmd
);
335 * We read the command header, now get the data.
344 dnet_schedule_command(st
);
346 r
->st
= dnet_state_get(st
);
348 dnet_schedule_io(n
, r
);
352 if (err
!= -EAGAIN
&& err
!= -EINTR
)
353 dnet_schedule_command(st
);
358 int dnet_state_accept_process(struct dnet_net_state
*orig
, struct epoll_event
*ev __unused
)
360 struct dnet_node
*n
= orig
->n
;
362 struct dnet_addr addr
;
363 struct dnet_net_state
*st
;
365 memset(&addr
, 0, sizeof(addr
));
367 addr
.addr_len
= sizeof(addr
.addr
);
368 cs
= accept(orig
->read_s
, (struct sockaddr
*)&addr
.addr
, &addr
.addr_len
);
372 dnet_log_err(n
, "failed to accept new client at %s", dnet_state_dump_addr(orig
));
376 dnet_set_sockopt(cs
);
378 st
= dnet_state_create(n
, 0, NULL
, 0, &addr
, cs
, &err
, 0, dnet_state_net_process
);
380 dnet_log(n
, DNET_LOG_ERROR
, "%s: Failed to create state for accepted client: %s [%d]\n",
381 dnet_server_convert_dnet_addr(&addr
), strerror(-err
), -err
);
386 dnet_log(n
, DNET_LOG_INFO
, "Accepted client %s, socket: %d.\n",
387 dnet_server_convert_dnet_addr(&addr
), cs
);
390 /* socket is closed in dnet_state_create() */
395 void dnet_unschedule_send(struct dnet_net_state
*st
)
397 struct epoll_event ev
;
399 ev
.events
= EPOLLOUT
;
402 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->write_s
, &ev
);
405 void dnet_unschedule_recv(struct dnet_net_state
*st
)
407 struct epoll_event ev
;
412 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->read_s
, &ev
);
415 static int dnet_process_send_single(struct dnet_net_state
*st
)
417 struct dnet_io_req
*r
= NULL
;
423 pthread_mutex_lock(&st
->send_lock
);
424 if (!list_empty(&st
->send_list
)) {
425 r
= list_first_entry(&st
->send_list
, struct dnet_io_req
, req_entry
);
427 dnet_unschedule_send(st
);
429 pthread_mutex_unlock(&st
->send_lock
);
436 err
= dnet_send_request(st
, r
);
445 static int dnet_schedule_network_io(struct dnet_net_state
*st
, int send
)
447 struct epoll_event ev
;
451 ev
.events
= EPOLLOUT
;
459 err
= epoll_ctl(st
->epoll_fd
, EPOLL_CTL_ADD
, fd
, &ev
);
463 if (err
== -EEXIST
) {
466 dnet_log_err(st
->n
, "%s: failed to add %s event", dnet_state_dump_addr(st
), send
? "SEND" : "RECV");
473 int dnet_schedule_send(struct dnet_net_state
*st
)
475 return dnet_schedule_network_io(st
, 1);
478 int dnet_schedule_recv(struct dnet_net_state
*st
)
480 return dnet_schedule_network_io(st
, 0);
483 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
)
485 int err
= -ECONNRESET
;
487 if (ev
->events
& EPOLLIN
) {
488 err
= dnet_process_recv_single(st
);
489 if (err
&& (err
!= -EAGAIN
))
492 if (ev
->events
& EPOLLOUT
) {
493 err
= dnet_process_send_single(st
);
494 if (err
&& (err
!= -EAGAIN
))
498 if (ev
->events
& (EPOLLHUP
| EPOLLERR
)) {
499 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: received error event mask %x\n", dnet_state_dump_addr(st
), ev
->events
);
506 static void *dnet_io_process_network(void *data_
)
508 struct dnet_net_io
*nio
= data_
;
509 struct dnet_node
*n
= nio
->n
;
510 struct dnet_net_state
*st
;
511 struct epoll_event ev
;
513 struct dnet_trans
*t
, *tmp
;
515 struct list_head head
;
517 dnet_set_name("net_pool");
519 while (!n
->need_exit
) {
520 err
= epoll_wait(nio
->epoll_fd
, &ev
, 1, 1000);
527 if (err
== -EAGAIN
|| err
== -EINTR
)
530 dnet_log_err(n
, "Failed to wait for IO fds");
536 st
->epoll_fd
= nio
->epoll_fd
;
540 err
= st
->process(st
, &ev
);
544 if (err
== -EAGAIN
&& st
->stall
< DNET_DEFAULT_STALL_TRANSACTIONS
)
547 if (err
< 0 || st
->stall
>= DNET_DEFAULT_STALL_TRANSACTIONS
) {
548 dnet_state_reset(st
);
557 gettimeofday(&tv
, NULL
);
559 INIT_LIST_HEAD(&head
);
561 pthread_mutex_lock(&st
->trans_lock
);
562 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_list_entry
) {
563 if (t
->time
.tv_sec
>= tv
.tv_sec
)
566 dnet_trans_remove_nolock(&st
->trans_root
, t
);
567 list_move(&t
->trans_list_entry
, &head
);
569 pthread_mutex_unlock(&st
->trans_lock
);
571 list_for_each_entry_safe(t
, tmp
, &head
, trans_list_entry
) {
572 list_del_init(&t
->trans_list_entry
);
576 t
->cmd
.status
= -ETIMEDOUT
;
578 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: destructing trans: %llu on TIMEOUT\n",
579 dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
582 t
->complete(st
, &t
->cmd
, t
->priv
);
588 return &n
->need_exit
;
591 static void dnet_io_cleanup_states(struct dnet_node
*n
)
593 struct dnet_net_state
*st
, *tmp
;
595 list_for_each_entry_safe(st
, tmp
, &n
->storage_state_list
, storage_state_entry
) {
596 dnet_state_reset(st
);
600 struct dnet_io_process_data
{
605 static void *dnet_io_process(void *data_
)
607 struct dnet_work_io
*wio
= data_
;
608 struct dnet_work_pool
*pool
= wio
->pool
;
609 struct dnet_node
*n
= pool
->n
;
610 struct dnet_net_state
*st
;
613 struct dnet_io_req
*r
;
616 dnet_set_name("io_pool");
618 while (!n
->need_exit
) {
622 gettimeofday(&tv
, NULL
);
623 ts
.tv_sec
= tv
.tv_sec
+ 1;
624 ts
.tv_nsec
= tv
.tv_usec
* 1000;
626 pthread_mutex_lock(&pool
->lock
);
628 if (!list_empty(&pool
->list
)) {
629 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
631 err
= pthread_cond_timedwait(&pool
->wait
, &pool
->lock
, &ts
);
632 if (!list_empty(&pool
->list
)) {
633 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
639 list_del_init(&r
->req_entry
);
640 atomic_dec(&pool
->avail
);
642 pthread_mutex_unlock(&pool
->lock
);
649 dnet_log(n
, DNET_LOG_DEBUG
, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
650 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
), r
, r
->hsize
, r
->dsize
, dnet_work_io_mode_str(pool
->mode
));
652 err
= dnet_process_recv(st
, r
);
657 atomic_inc(&pool
->avail
);
663 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
667 int io_size
= sizeof(struct dnet_io
) + sizeof(struct dnet_net_io
) * cfg
->net_thread_num
;
669 io
= malloc(io_size
);
675 memset(io
, 0, io_size
);
677 io
->net_thread_num
= cfg
->net_thread_num
;
678 io
->net_thread_pos
= 0;
679 io
->net
= (struct dnet_net_io
*)(io
+ 1);
681 io
->recv_pool
= dnet_work_pool_alloc(n
, cfg
->io_thread_num
, DNET_WORK_IO_MODE_BLOCKING
, dnet_io_process
);
682 if (!io
->recv_pool
) {
687 io
->recv_pool_nb
= dnet_work_pool_alloc(n
, cfg
->nonblocking_io_thread_num
, DNET_WORK_IO_MODE_NONBLOCKING
, dnet_io_process
);
688 if (!io
->recv_pool_nb
) {
690 goto err_out_free_recv_pool
;
693 for (i
=0; i
<io
->net_thread_num
; ++i
) {
694 struct dnet_net_io
*nio
= &io
->net
[i
];
698 nio
->epoll_fd
= epoll_create(10000);
699 if (nio
->epoll_fd
< 0) {
701 dnet_log_err(n
, "Failed to create epoll fd");
702 goto err_out_net_destroy
;
705 fcntl(nio
->epoll_fd
, F_SETFD
, FD_CLOEXEC
);
706 fcntl(nio
->epoll_fd
, F_SETFL
, O_NONBLOCK
);
708 err
= pthread_create(&nio
->tid
, NULL
, dnet_io_process_network
, nio
);
710 close(nio
->epoll_fd
);
712 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create network processing thread: %d\n", err
);
713 goto err_out_net_destroy
;
722 pthread_join(io
->net
[i
].tid
, NULL
);
723 close(io
->net
[i
].epoll_fd
);
726 dnet_work_pool_cleanup(io
->recv_pool_nb
);
727 err_out_free_recv_pool
:
728 dnet_work_pool_cleanup(io
->recv_pool
);
736 void dnet_io_exit(struct dnet_node
*n
)
738 struct dnet_io
*io
= n
->io
;
743 for (i
=0; i
<io
->net_thread_num
; ++i
) {
744 pthread_join(io
->net
[i
].tid
, NULL
);
745 close(io
->net
[i
].epoll_fd
);
748 dnet_work_pool_cleanup(io
->recv_pool_nb
);
749 dnet_work_pool_cleanup(io
->recv_pool
);
751 dnet_io_cleanup_states(n
);