2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string
[] = {
28 [DNET_WORK_IO_MODE_BLOCKING
] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING
] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode
)
34 if (mode
< 0 || mode
>= (int)ARRAY_SIZE(dnet_work_io_mode_string
))
37 return dnet_work_io_mode_string
[mode
];
40 static void dnet_schedule_io(struct dnet_node
*n
, struct dnet_io_req
*r
)
42 struct dnet_io
*io
= n
->io
;
43 struct dnet_cmd
*cmd
= r
->header
;
44 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
45 struct dnet_work_pool
*pool
= io
->recv_pool
;
48 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
49 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
),
50 (unsigned long long)cmd
->size
, nonblocking
);
51 } else if ((cmd
->size
== 0) && !(cmd
->flags
& DNET_FLAGS_MORE
) && (cmd
->trans
& DNET_TRANS_REPLY
)) {
52 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
53 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
);
55 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
56 int reply
= !!(cmd
->trans
& DNET_TRANS_REPLY
);
58 dnet_log(r
->st
->n
, DNET_LOG_DEBUG
, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
59 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(cmd
->cmd
), nonblocking
,
60 (unsigned long long)cmd
->size
, (unsigned long long)cmd
->flags
, tid
, reply
);
65 pool
= io
->recv_pool_nb
;
66 if ((cmd
->cmd
== DNET_CMD_EXEC
) && (cmd
->size
>= sizeof(struct sph
))) {
67 struct sph
*sph
= (struct sph
*)r
->data
;
68 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
)
69 pool
= io
->recv_pool_eblock
;
72 pthread_mutex_lock(&pool
->lock
);
73 list_add_tail(&r
->req_entry
, &pool
->list
);
74 pthread_cond_broadcast(&pool
->wait
);
75 pthread_mutex_unlock(&pool
->lock
);
78 void dnet_schedule_command(struct dnet_net_state
*st
)
80 st
->rcv_flags
= DNET_IO_CMD
;
84 struct dnet_cmd
*c
= &st
->rcv_cmd
;
85 unsigned long long tid
= c
->trans
& ~DNET_TRANS_REPLY
;
86 dnet_log(st
->n
, DNET_LOG_DEBUG
, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
87 (unsigned long long)c
->size
, tid
, tid
!= c
->trans
, st
->rcv_data
);
93 st
->rcv_end
= sizeof(struct dnet_cmd
);
97 static int dnet_process_recv_single(struct dnet_net_state
*st
)
99 struct dnet_node
*n
= st
->n
;
100 struct dnet_io_req
*r
;
107 * Reading command first.
109 if (st
->rcv_flags
& DNET_IO_CMD
)
113 data
+= st
->rcv_offset
;
114 size
= st
->rcv_end
- st
->rcv_offset
;
117 err
= recv(st
->read_s
, data
, size
, 0);
120 if (errno
!= EAGAIN
&& errno
!= EINTR
) {
122 dnet_log_err(n
, "failed to receive data, socket: %d", st
->read_s
);
130 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has disconnected.\n",
131 dnet_server_convert_dnet_addr(&st
->addr
));
136 st
->rcv_offset
+= err
;
139 if (st
->rcv_offset
!= st
->rcv_end
)
142 if (st
->rcv_flags
& DNET_IO_CMD
) {
143 unsigned long long tid
;
144 struct dnet_cmd
*c
= &st
->rcv_cmd
;
148 tid
= c
->trans
& ~DNET_TRANS_REPLY
;
150 dnet_log(n
, DNET_LOG_DEBUG
, "%s: received trans: %llu / %llx, "
151 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
152 dnet_dump_id(&c
->id
), tid
, (unsigned long long)c
->trans
,
153 !!(c
->trans
& DNET_TRANS_REPLY
),
154 (unsigned long long)c
->size
, (unsigned long long)c
->flags
, c
->status
);
156 r
= malloc(c
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_req
));
161 memset(r
, 0, sizeof(struct dnet_io_req
));
164 r
->hsize
= sizeof(struct dnet_cmd
);
165 memcpy(r
->header
, &st
->rcv_cmd
, sizeof(struct dnet_cmd
));
168 st
->rcv_offset
= sizeof(struct dnet_io_req
) + sizeof(struct dnet_cmd
);
169 st
->rcv_end
= st
->rcv_offset
+ c
->size
;
170 st
->rcv_flags
&= ~DNET_IO_CMD
;
173 r
->data
= r
->header
+ sizeof(struct dnet_cmd
);
177 * We read the command header, now get the data.
186 dnet_schedule_command(st
);
188 r
->st
= dnet_state_get(st
);
190 dnet_schedule_io(n
, r
);
194 if (err
!= -EAGAIN
&& err
!= -EINTR
)
195 dnet_schedule_command(st
);
200 int dnet_state_accept_process(struct dnet_net_state
*orig
, struct epoll_event
*ev __unused
)
202 struct dnet_node
*n
= orig
->n
;
204 struct dnet_addr addr
;
205 struct dnet_net_state
*st
;
207 memset(&addr
, 0, sizeof(addr
));
209 addr
.addr_len
= sizeof(addr
.addr
);
210 cs
= accept(orig
->read_s
, (struct sockaddr
*)&addr
.addr
, &addr
.addr_len
);
214 dnet_log_err(n
, "failed to accept new client at %s", dnet_state_dump_addr(orig
));
218 dnet_set_sockopt(cs
);
220 st
= dnet_state_create(n
, 0, NULL
, 0, &addr
, cs
, &err
, 0, dnet_state_net_process
);
222 dnet_log(n
, DNET_LOG_ERROR
, "%s: Failed to create state for accepted client: %s [%d]\n",
223 dnet_server_convert_dnet_addr(&addr
), strerror(-err
), -err
);
228 dnet_log(n
, DNET_LOG_INFO
, "Accepted client %s, socket: %d.\n",
229 dnet_server_convert_dnet_addr(&addr
), cs
);
232 /* socket is closed in dnet_state_create() */
237 void dnet_unschedule_send(struct dnet_net_state
*st
)
239 struct epoll_event ev
;
241 ev
.events
= EPOLLOUT
;
244 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->write_s
, &ev
);
247 void dnet_unschedule_recv(struct dnet_net_state
*st
)
249 struct epoll_event ev
;
254 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->read_s
, &ev
);
257 static int dnet_process_send_single(struct dnet_net_state
*st
)
259 struct dnet_io_req
*r
= NULL
;
265 pthread_mutex_lock(&st
->send_lock
);
266 if (!list_empty(&st
->send_list
)) {
267 r
= list_first_entry(&st
->send_list
, struct dnet_io_req
, req_entry
);
269 dnet_unschedule_send(st
);
271 pthread_mutex_unlock(&st
->send_lock
);
278 err
= dnet_send_request(st
, r
);
287 static int dnet_schedule_network_io(struct dnet_net_state
*st
, int send
)
289 struct epoll_event ev
;
293 ev
.events
= EPOLLOUT
;
301 err
= epoll_ctl(st
->epoll_fd
, EPOLL_CTL_ADD
, fd
, &ev
);
305 if (err
== -EEXIST
) {
308 dnet_log_err(st
->n
, "%s: failed to add %s event", dnet_state_dump_addr(st
), send
? "SEND" : "RECV");
315 int dnet_schedule_send(struct dnet_net_state
*st
)
317 return dnet_schedule_network_io(st
, 1);
320 int dnet_schedule_recv(struct dnet_net_state
*st
)
322 return dnet_schedule_network_io(st
, 0);
325 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
)
327 int err
= -ECONNRESET
;
329 if (ev
->events
& EPOLLIN
) {
330 err
= dnet_process_recv_single(st
);
331 if (err
&& (err
!= -EAGAIN
))
334 if (ev
->events
& EPOLLOUT
) {
335 err
= dnet_process_send_single(st
);
336 if (err
&& (err
!= -EAGAIN
))
340 if (ev
->events
& (EPOLLHUP
| EPOLLERR
)) {
341 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: received error event mask %x\n", dnet_state_dump_addr(st
), ev
->events
);
348 static void *dnet_io_process_network(void *data_
)
350 struct dnet_net_io
*nio
= data_
;
351 struct dnet_node
*n
= nio
->n
;
352 struct dnet_net_state
*st
;
353 struct epoll_event ev
;
355 struct dnet_trans
*t
, *tmp
;
357 struct list_head head
;
359 dnet_set_name("net_pool");
361 while (!n
->need_exit
) {
362 err
= epoll_wait(nio
->epoll_fd
, &ev
, 1, 1000);
369 if (err
== -EAGAIN
|| err
== -EINTR
)
372 dnet_log_err(n
, "Failed to wait for IO fds");
378 st
->epoll_fd
= nio
->epoll_fd
;
382 err
= st
->process(st
, &ev
);
386 if (err
== -EAGAIN
&& st
->stall
< DNET_DEFAULT_STALL_TRANSACTIONS
)
389 if (err
< 0 || st
->stall
>= DNET_DEFAULT_STALL_TRANSACTIONS
) {
390 dnet_state_reset(st
);
399 gettimeofday(&tv
, NULL
);
401 INIT_LIST_HEAD(&head
);
403 pthread_mutex_lock(&st
->trans_lock
);
404 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_list_entry
) {
405 if (t
->time
.tv_sec
>= tv
.tv_sec
)
408 dnet_trans_remove_nolock(&st
->trans_root
, t
);
409 list_move(&t
->trans_list_entry
, &head
);
411 pthread_mutex_unlock(&st
->trans_lock
);
413 list_for_each_entry_safe(t
, tmp
, &head
, trans_list_entry
) {
414 list_del_init(&t
->trans_list_entry
);
418 t
->cmd
.status
= -ETIMEDOUT
;
420 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: destructing trans: %llu on TIMEOUT\n",
421 dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
424 t
->complete(st
, &t
->cmd
, t
->priv
);
430 return &n
->need_exit
;
433 static void dnet_io_cleanup_states(struct dnet_node
*n
)
435 struct dnet_net_state
*st
, *tmp
;
437 list_for_each_entry_safe(st
, tmp
, &n
->storage_state_list
, storage_state_entry
) {
438 dnet_state_reset(st
);
442 struct dnet_io_process_data
{
447 static void *dnet_io_process(void *data_
)
449 struct dnet_work_io
*wio
= data_
;
450 struct dnet_work_pool
*pool
= wio
->pool
;
451 struct dnet_node
*n
= pool
->n
;
452 struct dnet_net_state
*st
;
455 struct dnet_io_req
*r
;
458 dnet_set_name("io_pool");
460 while (!n
->need_exit
) {
464 gettimeofday(&tv
, NULL
);
465 ts
.tv_sec
= tv
.tv_sec
+ 1;
466 ts
.tv_nsec
= tv
.tv_usec
* 1000;
468 pthread_mutex_lock(&pool
->lock
);
470 if (!list_empty(&pool
->list
)) {
471 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
473 err
= pthread_cond_timedwait(&pool
->wait
, &pool
->lock
, &ts
);
474 if (!list_empty(&pool
->list
)) {
475 r
= list_first_entry(&pool
->list
, struct dnet_io_req
, req_entry
);
481 list_del_init(&r
->req_entry
);
482 pthread_mutex_unlock(&pool
->lock
);
489 dnet_log(n
, DNET_LOG_DEBUG
, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
490 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
), r
, r
->hsize
, r
->dsize
, dnet_work_io_mode_str(pool
->mode
));
492 err
= dnet_process_recv(st
, r
);
501 static void dnet_work_pool_cleanup(struct dnet_work_pool
*pool
)
503 struct dnet_io_req
*r
, *tmp
;
506 for (i
= 0; i
< pool
->num
; ++i
) {
507 struct dnet_work_io
*wio
= &pool
->wio
[i
];
509 pthread_join(wio
->tid
, NULL
);
512 list_for_each_entry_safe(r
, tmp
, &pool
->list
, req_entry
) {
513 list_del(&r
->req_entry
);
517 pthread_cond_destroy(&pool
->wait
);
518 pthread_mutex_destroy(&pool
->lock
);
522 static struct dnet_work_pool
*dnet_work_pool_alloc(struct dnet_node
*n
, int num
, int mode
, void *(* process
)(void *))
524 struct dnet_work_pool
*pool
;
527 pool
= malloc(sizeof(struct dnet_work_pool
) + num
* sizeof(struct dnet_work_io
));
533 memset(pool
, 0, sizeof(struct dnet_work_pool
) + num
* sizeof(struct dnet_work_io
));
538 INIT_LIST_HEAD(&pool
->list
);
540 err
= pthread_mutex_init(&pool
->lock
, NULL
);
546 err
= pthread_cond_init(&pool
->wait
, NULL
);
549 goto err_out_mutex_destroy
;
552 for (i
= 0; i
< num
; ++i
) {
553 struct dnet_work_io
*wio
= &pool
->wio
[i
];
555 wio
->thread_index
= i
;
558 err
= pthread_create(&wio
->tid
, NULL
, process
, wio
);
561 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create IO thread: %d\n", err
);
562 goto err_out_io_threads
;
570 struct dnet_work_io
*wio
= &pool
->wio
[i
];
571 pthread_join(wio
->tid
, NULL
);
573 pthread_cond_destroy(&pool
->wait
);
574 err_out_mutex_destroy
:
575 pthread_mutex_destroy(&pool
->lock
);
582 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
586 int io_size
= sizeof(struct dnet_io
) + sizeof(struct dnet_net_io
) * cfg
->net_thread_num
;
588 io
= malloc(io_size
);
594 memset(io
, 0, io_size
);
596 io
->net_thread_num
= cfg
->net_thread_num
;
597 io
->net_thread_pos
= 0;
598 io
->net
= (struct dnet_net_io
*)(io
+ 1);
600 io
->recv_pool
= dnet_work_pool_alloc(n
, cfg
->io_thread_num
, DNET_WORK_IO_MODE_BLOCKING
, dnet_io_process
);
601 if (!io
->recv_pool
) {
606 io
->recv_pool_nb
= dnet_work_pool_alloc(n
, cfg
->nonblocking_io_thread_num
, DNET_WORK_IO_MODE_NONBLOCKING
, dnet_io_process
);
607 if (!io
->recv_pool_nb
) {
609 goto err_out_free_recv_pool
;
612 io
->recv_pool_eblock
= dnet_work_pool_alloc(n
, cfg
->nonblocking_io_thread_num
, DNET_WORK_IO_MODE_EXEC_BLOCKING
, dnet_io_process
);
613 if (!io
->recv_pool_nb
) {
615 goto err_out_free_recv_pool_nb
;
618 for (i
=0; i
<io
->net_thread_num
; ++i
) {
619 struct dnet_net_io
*nio
= &io
->net
[i
];
623 nio
->epoll_fd
= epoll_create(10000);
624 if (nio
->epoll_fd
< 0) {
626 dnet_log_err(n
, "Failed to create epoll fd");
627 goto err_out_net_destroy
;
630 fcntl(nio
->epoll_fd
, F_SETFD
, FD_CLOEXEC
);
631 fcntl(nio
->epoll_fd
, F_SETFL
, O_NONBLOCK
);
633 err
= pthread_create(&nio
->tid
, NULL
, dnet_io_process_network
, nio
);
635 close(nio
->epoll_fd
);
637 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create network processing thread: %d\n", err
);
638 goto err_out_net_destroy
;
647 pthread_join(io
->net
[i
].tid
, NULL
);
648 close(io
->net
[i
].epoll_fd
);
651 dnet_work_pool_cleanup(io
->recv_pool_eblock
);
652 err_out_free_recv_pool_nb
:
653 dnet_work_pool_cleanup(io
->recv_pool_nb
);
654 err_out_free_recv_pool
:
655 dnet_work_pool_cleanup(io
->recv_pool
);
663 void dnet_io_exit(struct dnet_node
*n
)
665 struct dnet_io
*io
= n
->io
;
670 for (i
=0; i
<io
->net_thread_num
; ++i
) {
671 pthread_join(io
->net
[i
].tid
, NULL
);
672 close(io
->net
[i
].epoll_fd
);
675 dnet_work_pool_cleanup(io
->recv_pool_eblock
);
676 dnet_work_pool_cleanup(io
->recv_pool_nb
);
677 dnet_work_pool_cleanup(io
->recv_pool
);
679 dnet_io_cleanup_states(n
);