2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string
[] = {
28 [DNET_WORK_IO_MODE_BLOCKING
] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING
] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode
)
34 if (mode
< 0 || mode
>= (int)ARRAY_SIZE(dnet_work_io_mode_string
))
37 return dnet_work_io_mode_string
[mode
];
40 static void dnet_work_pool_cleanup(struct dnet_work_pool
*pool
)
42 struct dnet_io_req
*r
, *tmp
;
43 struct dnet_work_io
*wio
, *wio_tmp
;
45 list_for_each_entry_safe(wio
, wio_tmp
, &pool
->wio_list
, wio_entry
) {
46 pthread_join(wio
->tid
, NULL
);
47 list_del(&wio
->wio_entry
);
52 list_for_each_entry_safe(r
, tmp
, &pool
->list
, req_entry
) {
53 list_del(&r
->req_entry
);
57 pthread_cond_destroy(&pool
->wait
);
58 pthread_mutex_destroy(&pool
->lock
);
62 static int dnet_work_pool_grow(struct dnet_node
*n
, struct dnet_work_pool
*pool
, int num
, void *(* process
)(void *))
65 struct dnet_work_io
*wio
, *tmp
;
67 pthread_mutex_lock(&pool
->lock
);
69 for (i
= 0; i
< num
; ++i
) {
70 wio
= malloc(sizeof(struct dnet_work_io
));
73 goto err_out_io_threads
;
76 wio
->thread_index
= i
;
78 list_add_tail(&wio
->wio_entry
, &pool
->wio_list
);
80 err
= pthread_create(&wio
->tid
, NULL
, process
, wio
);
83 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create IO thread: %d\n", err
);
84 goto err_out_io_threads
;
88 dnet_log(n
, DNET_LOG_INFO
, "Grew %s pool by: %d -> %d IO threads\n",
89 dnet_work_io_mode_str(pool
->mode
), pool
->num
, pool
->num
+ num
);
92 pthread_mutex_unlock(&pool
->lock
);
97 list_for_each_entry_safe(wio
, tmp
, &pool
->wio_list
, wio_entry
) {
98 pthread_join(wio
->tid
, NULL
);
99 list_del(&wio
->wio_entry
);
103 pthread_mutex_unlock(&pool
->lock
);
108 static struct dnet_work_pool
*dnet_work_pool_alloc(struct dnet_node
*n
, int num
, int mode
, void *(* process
)(void *))
110 struct dnet_work_pool
*pool
;
113 pool
= malloc(sizeof(struct dnet_work_pool
));
119 memset(pool
, 0, sizeof(struct dnet_work_pool
));
124 INIT_LIST_HEAD(&pool
->list
);
125 INIT_LIST_HEAD(&pool
->wio_list
);
127 err
= pthread_mutex_init(&pool
->lock
, NULL
);
133 err
= pthread_cond_init(&pool
->wait
, NULL
);
136 goto err_out_mutex_destroy
;
139 err
= dnet_work_pool_grow(n
, pool
, num
, process
);
141 goto err_out_cond_destroy
;
145 err_out_cond_destroy
:
146 pthread_cond_destroy(&pool
->wait
);
147 err_out_mutex_destroy
:
148 pthread_mutex_destroy(&pool
->lock
);
155 static void *dnet_io_process(void *data_
);
156 static void dnet_schedule_io(struct dnet_node
*n
, struct dnet_io_req
*r
)
158 struct dnet_io
*io
= n
->io
;
159 struct dnet_cmd
*cmd
= r
->header
;
160 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
161 struct dnet_work_pool
*pool
= io
->recv_pool
;
164 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
165 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
),
166 (unsigned long long)cmd
->size
, nonblocking
);
167 } else if ((cmd
->size
== 0) && !(cmd
->flags
& DNET_FLAGS_MORE
) && (cmd
->trans
& DNET_TRANS_REPLY
)) {
168 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
169 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
);
171 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
172 int reply
= !!(cmd
->trans
& DNET_TRANS_REPLY
);
174 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
175 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
,
176 (unsigned long long)cmd
->size
, (unsigned long long)cmd
->flags
, tid
, reply
);
181 pool
= io
->recv_pool_nb
;
183 if (list_empty(&pool
->list
) && (cmd
->cmd
== DNET_CMD_EXEC
) && (cmd
->size
>= sizeof(struct sph
))) {
184 struct sph
*sph
= (struct sph
*)r
->data
;
185 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
186 dnet_work_pool_grow(n
, pool
, pool
->num
/4+1, dnet_io_process
);
190 pthread_mutex_lock(&pool
->lock
);
191 list_add_tail(&r
->req_entry
, &pool
->list
);
192 pthread_cond_broadcast(&pool
->wait
);
193 pthread_mutex_unlock(&pool
->lock
);
197 void dnet_schedule_command(struct dnet_net_state
*st
)
199 st
->rcv_flags
= DNET_IO_CMD
;
203 struct dnet_cmd
*c
= &st
->rcv_cmd
;
204 unsigned long long tid
= c
->trans
& ~DNET_TRANS_REPLY
;
205 dnet_log(st
->n
, DNET_LOG_DEBUG
, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
206 (unsigned long long)c
->size
, tid
, tid
!= c
->trans
, st
->rcv_data
);
212 st
->rcv_end
= sizeof(struct dnet_cmd
);
216 static int dnet_process_recv_single(struct dnet_net_state
*st
)
218 struct dnet_node
*n
= st
->n
;
219 struct dnet_io_req
*r
;
226 * Reading command first.
228 if (st
->rcv_flags
& DNET_IO_CMD
)
232 data
+= st
->rcv_offset
;
233 size
= st
->rcv_end
- st
->rcv_offset
;
236 err
= recv(st
->read_s
, data
, size
, 0);
239 if (errno
!= EAGAIN
&& errno
!= EINTR
) {
241 dnet_log_err(n
, "failed to receive data, socket: %d", st
->read_s
);
249 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has disconnected.\n",
250 dnet_server_convert_dnet_addr(&st
->addr
));
255 st
->rcv_offset
+= err
;
258 if (st
->rcv_offset
!= st
->rcv_end
)
261 if (st
->rcv_flags
& DNET_IO_CMD
) {
262 unsigned long long tid
;
263 struct dnet_cmd
*c
= &st
->rcv_cmd
;
267 tid
= c
->trans
& ~DNET_TRANS_REPLY
;
269 dnet_log(n
, DNET_LOG_DEBUG
, "%s: received trans: %llu / %llx, "
270 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
271 dnet_dump_id(&c
->id
), tid
, (unsigned long long)c
->trans
,
272 !!(c
->trans
& DNET_TRANS_REPLY
),
273 (unsigned long long)c
->size
, (unsigned long long)c
->flags
, c
->status
);
275 r
= malloc(c
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_req
));
280 memset(r
, 0, sizeof(struct dnet_io_req
));
283 r
->hsize
= sizeof(struct dnet_cmd
);
284 memcpy(r
->header
, &st
->rcv_cmd
, sizeof(struct dnet_cmd
));
287 st
->rcv_offset
= sizeof(struct dnet_io_req
) + sizeof(struct dnet_cmd
);
288 st
->rcv_end
= st
->rcv_offset
+ c
->size
;
289 st
->rcv_flags
&= ~DNET_IO_CMD
;
292 r
->data
= r
->header
+ sizeof(struct dnet_cmd
);
296 * We read the command header, now get the data.
305 dnet_schedule_command(st
);
307 r
->st
= dnet_state_get(st
);
309 dnet_schedule_io(n
, r
);
313 if (err
!= -EAGAIN
&& err
!= -EINTR
)
314 dnet_schedule_command(st
);
319 int dnet_state_accept_process(struct dnet_net_state
*orig
, struct epoll_event
*ev __unused
)
321 struct dnet_node
*n
= orig
->n
;
323 struct dnet_addr addr
;
324 struct dnet_net_state
*st
;
326 memset(&addr
, 0, sizeof(addr
));
328 addr
.addr_len
= sizeof(addr
.addr
);
329 cs
= accept(orig
->read_s
, (struct sockaddr
*)&addr
.addr
, &addr
.addr_len
);
333 dnet_log_err(n
, "failed to accept new client at %s", dnet_state_dump_addr(orig
));
337 dnet_set_sockopt(cs
);
339 st
= dnet_state_create(n
, 0, NULL
, 0, &addr
, cs
, &err
, 0, dnet_state_net_process
);
341 dnet_log(n
, DNET_LOG_ERROR
, "%s: Failed to create state for accepted client: %s [%d]\n",
342 dnet_server_convert_dnet_addr(&addr
), strerror(-err
), -err
);
347 dnet_log(n
, DNET_LOG_INFO
, "Accepted client %s, socket: %d.\n",
348 dnet_server_convert_dnet_addr(&addr
), cs
);
351 /* socket is closed in dnet_state_create() */
356 void dnet_unschedule_send(struct dnet_net_state
*st
)
358 struct epoll_event ev
;
360 ev
.events
= EPOLLOUT
;
363 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->write_s
, &ev
);
366 void dnet_unschedule_recv(struct dnet_net_state
*st
)
368 struct epoll_event ev
;
373 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->read_s
, &ev
);
376 static int dnet_process_send_single(struct dnet_net_state
*st
)
378 struct dnet_io_req
*r
= NULL
;
384 pthread_mutex_lock(&st
->send_lock
);
385 if (!list_empty(&st
->send_list
)) {
386 r
= list_first_entry(&st
->send_list
, struct dnet_io_req
, req_entry
);
388 dnet_unschedule_send(st
);
390 pthread_mutex_unlock(&st
->send_lock
);
397 err
= dnet_send_request(st
, r
);
406 static int dnet_schedule_network_io(struct dnet_net_state
*st
, int send
)
408 struct epoll_event ev
;
412 ev
.events
= EPOLLOUT
;
420 err
= epoll_ctl(st
->epoll_fd
, EPOLL_CTL_ADD
, fd
, &ev
);
424 if (err
== -EEXIST
) {
427 dnet_log_err(st
->n
, "%s: failed to add %s event", dnet_state_dump_addr(st
), send
? "SEND" : "RECV");
434 int dnet_schedule_send(struct dnet_net_state
*st
)
436 return dnet_schedule_network_io(st
, 1);
439 int dnet_schedule_recv(struct dnet_net_state
*st
)
441 return dnet_schedule_network_io(st
, 0);
444 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
)
446 int err
= -ECONNRESET
;
448 if (ev
->events
& EPOLLIN
) {
449 err
= dnet_process_recv_single(st
);
450 if (err
&& (err
!= -EAGAIN
))
453 if (ev
->events
& EPOLLOUT
) {
454 err
= dnet_process_send_single(st
);
455 if (err
&& (err
!= -EAGAIN
))
459 if (ev
->events
& (EPOLLHUP
| EPOLLERR
)) {
460 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: received error event mask %x\n", dnet_state_dump_addr(st
), ev
->events
);
467 static void *dnet_io_process_network(void *data_
)
469 struct dnet_net_io
*nio
= data_
;
470 struct dnet_node
*n
= nio
->n
;
471 struct dnet_net_state
*st
;
472 struct epoll_event ev
;
474 struct dnet_trans
*t
, *tmp
;
476 struct list_head head
;
478 dnet_set_name("net_pool");
480 while (!n
->need_exit
) {
481 err
= epoll_wait(nio
->epoll_fd
, &ev
, 1, 1000);
488 if (err
== -EAGAIN
|| err
== -EINTR
)
491 dnet_log_err(n
, "Failed to wait for IO fds");
497 st
->epoll_fd
= nio
->epoll_fd
;
501 err
= st
->process(st
, &ev
);
505 if (err
== -EAGAIN
&& st
->stall
< DNET_DEFAULT_STALL_TRANSACTIONS
)
508 if (err
< 0 || st
->stall
>= DNET_DEFAULT_STALL_TRANSACTIONS
) {
509 dnet_state_reset(st
);
518 gettimeofday(&tv
, NULL
);
520 INIT_LIST_HEAD(&head
);
522 pthread_mutex_lock(&st
->trans_lock
);
523 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_list_entry
) {
524 if (t
->time
.tv_sec
>= tv
.tv_sec
)
527 dnet_trans_remove_nolock(&st
->trans_root
, t
);
528 list_move(&t
->trans_list_entry
, &head
);
530 pthread_mutex_unlock(&st
->trans_lock
);
532 list_for_each_entry_safe(t
, tmp
, &head
, trans_list_entry
) {
533 list_del_init(&t
->trans_list_entry
);
537 t
->cmd
.status
= -ETIMEDOUT
;
539 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: destructing trans: %llu on TIMEOUT\n",
540 dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
543 t
->complete(st
, &t
->cmd
, t
->priv
);
549 return &n
->need_exit
;
552 static void dnet_io_cleanup_states(struct dnet_node
*n
)
554 struct dnet_net_state
*st
, *tmp
;
556 list_for_each_entry_safe(st
, tmp
, &n
->storage_state_list
, storage_state_entry
) {
557 dnet_state_reset(st
);
561 struct dnet_io_process_data
{
566 static void *dnet_io_process(void *data_
)
568 struct dnet_work_io
*wio
= data_
;
569 struct dnet_work_pool
*pool
= wio
->pool
;
570 struct dnet_node
*n
= pool
->n
;
571 struct dnet_net_state
*st
;
574 struct dnet_io_req
*r
;
577 dnet_set_name("io_pool");
579 while (!n
->need_exit
) {
583 gettimeofday(&tv
, NULL
);
584 ts
.tv_sec
= tv
.tv_sec
+ 1;
585 ts
.tv_nsec
= tv
.tv_usec
* 1000;
587 pthread_mutex_lock(&pool
->lock
);
589 if (!list_empty(&pool
->list
)) {
590 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
592 err
= pthread_cond_timedwait(&pool
->wait
, &pool
->lock
, &ts
);
593 if (!list_empty(&pool
->list
)) {
594 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
600 list_del_init(&r
->req_entry
);
601 pthread_mutex_unlock(&pool
->lock
);
608 dnet_log(n
, DNET_LOG_DEBUG
, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
609 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
), r
, r
->hsize
, r
->dsize
, dnet_work_io_mode_str(pool
->mode
));
611 err
= dnet_process_recv(st
, r
);
620 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
624 int io_size
= sizeof(struct dnet_io
) + sizeof(struct dnet_net_io
) * cfg
->net_thread_num
;
626 io
= malloc(io_size
);
632 memset(io
, 0, io_size
);
634 io
->net_thread_num
= cfg
->net_thread_num
;
635 io
->net_thread_pos
= 0;
636 io
->net
= (struct dnet_net_io
*)(io
+ 1);
638 io
->recv_pool
= dnet_work_pool_alloc(n
, cfg
->io_thread_num
, DNET_WORK_IO_MODE_BLOCKING
, dnet_io_process
);
639 if (!io
->recv_pool
) {
644 io
->recv_pool_nb
= dnet_work_pool_alloc(n
, cfg
->nonblocking_io_thread_num
, DNET_WORK_IO_MODE_NONBLOCKING
, dnet_io_process
);
645 if (!io
->recv_pool_nb
) {
647 goto err_out_free_recv_pool
;
650 for (i
=0; i
<io
->net_thread_num
; ++i
) {
651 struct dnet_net_io
*nio
= &io
->net
[i
];
655 nio
->epoll_fd
= epoll_create(10000);
656 if (nio
->epoll_fd
< 0) {
658 dnet_log_err(n
, "Failed to create epoll fd");
659 goto err_out_net_destroy
;
662 fcntl(nio
->epoll_fd
, F_SETFD
, FD_CLOEXEC
);
663 fcntl(nio
->epoll_fd
, F_SETFL
, O_NONBLOCK
);
665 err
= pthread_create(&nio
->tid
, NULL
, dnet_io_process_network
, nio
);
667 close(nio
->epoll_fd
);
669 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create network processing thread: %d\n", err
);
670 goto err_out_net_destroy
;
679 pthread_join(io
->net
[i
].tid
, NULL
);
680 close(io
->net
[i
].epoll_fd
);
683 dnet_work_pool_cleanup(io
->recv_pool_nb
);
684 err_out_free_recv_pool
:
685 dnet_work_pool_cleanup(io
->recv_pool
);
693 void dnet_io_exit(struct dnet_node
*n
)
695 struct dnet_io
*io
= n
->io
;
700 for (i
=0; i
<io
->net_thread_num
; ++i
) {
701 pthread_join(io
->net
[i
].tid
, NULL
);
702 close(io
->net
[i
].epoll_fd
);
705 dnet_work_pool_cleanup(io
->recv_pool_nb
);
706 dnet_work_pool_cleanup(io
->recv_pool
);
708 dnet_io_cleanup_states(n
);