2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
26 #include "elliptics.h"
27 #include "elliptics/interface.h"
29 static void dnet_schedule_io(struct dnet_node
*n
, struct dnet_io_req
*r
)
31 struct dnet_io
*io
= n
->io
;
32 struct dnet_cmd
*cmd
= r
->header
;
33 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
34 struct dnet_attr
*attr
= r
->header
+ sizeof(struct dnet_cmd
);
36 dnet_log(r
->st
->n
, DNET_LOG_DSA
, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
37 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(attr
->cmd
),
38 (unsigned long long)cmd
->size
, nonblocking
);
40 pthread_mutex_lock(&io
->recv_lock
);
43 list_add_tail(&r
->req_entry
, &io
->nonblocking_recv_list
);
45 list_add_tail(&r
->req_entry
, &io
->recv_list
);
47 pthread_cond_broadcast(&io
->recv_wait
);
48 pthread_mutex_unlock(&io
->recv_lock
);
51 void dnet_schedule_command(struct dnet_net_state
*st
)
53 st
->rcv_flags
= DNET_IO_CMD
;
57 struct dnet_cmd
*c
= &st
->rcv_cmd
;
58 unsigned long long tid
= c
->trans
& ~DNET_TRANS_REPLY
;
59 dnet_log(st
->n
, DNET_LOG_DSA
, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
60 (unsigned long long)c
->size
, tid
, tid
!= c
->trans
, st
->rcv_data
);
66 st
->rcv_end
= sizeof(struct dnet_cmd
);
70 static int dnet_process_recv_single(struct dnet_net_state
*st
)
72 struct dnet_node
*n
= st
->n
;
73 struct dnet_io_req
*r
;
80 * Reading command first.
82 if (st
->rcv_flags
& DNET_IO_CMD
)
86 data
+= st
->rcv_offset
;
87 size
= st
->rcv_end
- st
->rcv_offset
;
90 err
= recv(st
->read_s
, data
, size
, 0);
93 if (errno
!= EAGAIN
&& errno
!= EINTR
) {
95 dnet_log_err(n
, "failed to receive data, socket: %d", st
->read_s
);
103 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has disconnected.\n",
104 dnet_server_convert_dnet_addr(&st
->addr
));
109 st
->rcv_offset
+= err
;
112 if (st
->rcv_offset
!= st
->rcv_end
)
115 if (st
->rcv_flags
& DNET_IO_CMD
) {
116 unsigned long long tid
;
117 struct dnet_cmd
*c
= &st
->rcv_cmd
;
121 tid
= c
->trans
& ~DNET_TRANS_REPLY
;
123 dnet_log(n
, DNET_LOG_DSA
, "%s: received trans: %llu / %llx, "
124 "reply: %d, size: %llu, flags: %x, status: %d.\n",
125 dnet_dump_id(&c
->id
), tid
, (unsigned long long)c
->trans
,
126 !!(c
->trans
& DNET_TRANS_REPLY
),
127 (unsigned long long)c
->size
, c
->flags
, c
->status
);
129 r
= malloc(c
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_req
));
134 memset(r
, 0, sizeof(struct dnet_io_req
));
137 r
->hsize
= sizeof(struct dnet_cmd
);
138 memcpy(r
->header
, &st
->rcv_cmd
, sizeof(struct dnet_cmd
));
141 st
->rcv_offset
= sizeof(struct dnet_io_req
) + sizeof(struct dnet_cmd
);
142 st
->rcv_end
= st
->rcv_offset
+ c
->size
;
143 st
->rcv_flags
&= ~DNET_IO_CMD
;
146 r
->data
= r
->header
+ sizeof(struct dnet_cmd
);
150 * We read the command header, now get the data.
159 dnet_schedule_command(st
);
161 r
->st
= dnet_state_get(st
);
163 dnet_schedule_io(n
, r
);
167 if (err
!= -EAGAIN
&& err
!= -EINTR
)
168 dnet_schedule_command(st
);
173 int dnet_state_accept_process(struct dnet_net_state
*orig
, struct epoll_event
*ev __unused
)
175 struct dnet_node
*n
= orig
->n
;
177 struct dnet_addr addr
;
178 struct dnet_net_state
*st
;
180 memset(&addr
, 0, sizeof(addr
));
182 addr
.addr_len
= sizeof(addr
.addr
);
183 cs
= accept(orig
->read_s
, (struct sockaddr
*)&addr
.addr
, &addr
.addr_len
);
187 dnet_log_err(n
, "failed to accept new client at %s", dnet_state_dump_addr(orig
));
191 dnet_set_sockopt(cs
);
193 st
= dnet_state_create(n
, 0, NULL
, 0, &addr
, cs
, &err
, 0, dnet_state_net_process
);
195 dnet_log(n
, DNET_LOG_ERROR
, "%s: Failed to create state for accepted client: %s [%d]\n",
196 dnet_server_convert_dnet_addr(&addr
), strerror(-err
), -err
);
201 dnet_log(n
, DNET_LOG_INFO
, "Accepted client %s, socket: %d.\n",
202 dnet_server_convert_dnet_addr(&addr
), cs
);
205 /* socket is closed in dnet_state_create() */
210 void dnet_unschedule_send(struct dnet_net_state
*st
)
212 struct epoll_event ev
;
214 ev
.events
= EPOLLOUT
;
217 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->write_s
, &ev
);
220 void dnet_unschedule_recv(struct dnet_net_state
*st
)
222 struct epoll_event ev
;
227 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->read_s
, &ev
);
230 static int dnet_process_send_single(struct dnet_net_state
*st
)
232 struct dnet_io_req
*r
= NULL
;
238 pthread_mutex_lock(&st
->send_lock
);
239 if (!list_empty(&st
->send_list
)) {
240 r
= list_first_entry(&st
->send_list
, struct dnet_io_req
, req_entry
);
242 dnet_unschedule_send(st
);
244 pthread_mutex_unlock(&st
->send_lock
);
251 err
= dnet_send_request(st
, r
);
260 static int dnet_schedule_network_io(struct dnet_net_state
*st
, int send
)
262 struct epoll_event ev
;
266 ev
.events
= EPOLLOUT
;
274 err
= epoll_ctl(st
->epoll_fd
, EPOLL_CTL_ADD
, fd
, &ev
);
278 if (err
== -EEXIST
) {
281 dnet_log_err(st
->n
, "%s: failed to add %s event", dnet_state_dump_addr(st
), send
? "SEND" : "RECV");
288 int dnet_schedule_send(struct dnet_net_state
*st
)
290 return dnet_schedule_network_io(st
, 1);
293 int dnet_schedule_recv(struct dnet_net_state
*st
)
295 return dnet_schedule_network_io(st
, 0);
298 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
)
300 int err
= -ECONNRESET
;
302 if (ev
->events
& EPOLLIN
) {
303 err
= dnet_process_recv_single(st
);
304 if (err
&& (err
!= -EAGAIN
))
307 if (ev
->events
& EPOLLOUT
) {
308 err
= dnet_process_send_single(st
);
309 if (err
&& (err
!= -EAGAIN
))
313 if (ev
->events
& (EPOLLHUP
| EPOLLERR
)) {
314 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: received error event mask %x\n", dnet_state_dump_addr(st
), ev
->events
);
321 static void *dnet_io_process(void *data_
)
323 struct dnet_net_io
*nio
= data_
;
324 struct dnet_node
*n
= nio
->n
;
325 struct dnet_net_state
*st
;
326 struct epoll_event ev
;
328 struct dnet_trans
*t
, *tmp
;
330 struct list_head head
;
332 dnet_set_name("net_pool");
333 dnet_log(n
, DNET_LOG_NOTICE
, "Starting network processing thread.\n");
335 while (!n
->need_exit
) {
336 err
= epoll_wait(nio
->epoll_fd
, &ev
, 1, 1000);
343 if (err
== -EAGAIN
|| err
== -EINTR
)
346 dnet_log_err(n
, "Failed to wait for IO fds");
352 st
->epoll_fd
= nio
->epoll_fd
;
356 err
= st
->process(st
, &ev
);
360 if (err
== -EAGAIN
&& st
->stall
< DNET_DEFAULT_STALL_TRANSACTIONS
)
363 if (err
< 0 || st
->stall
>= DNET_DEFAULT_STALL_TRANSACTIONS
) {
364 dnet_state_reset(st
);
373 gettimeofday(&tv
, NULL
);
375 INIT_LIST_HEAD(&head
);
377 pthread_mutex_lock(&st
->trans_lock
);
378 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_list_entry
) {
379 if (t
->time
.tv_sec
>= tv
.tv_sec
)
382 dnet_trans_remove_nolock(&st
->trans_root
, t
);
383 list_move(&t
->trans_list_entry
, &head
);
385 pthread_mutex_unlock(&st
->trans_lock
);
387 list_for_each_entry_safe(t
, tmp
, &head
, trans_list_entry
) {
388 list_del_init(&t
->trans_list_entry
);
392 t
->cmd
.status
= -ETIMEDOUT
;
394 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: destructing trans: %llu on TIMEOUT\n",
395 dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
398 t
->complete(st
, &t
->cmd
, NULL
, t
->priv
);
404 dnet_log(n
, DNET_LOG_NOTICE
, "Exiting network processing thread: need_exit: %d, err: %d.\n", n
->need_exit
, err
);
405 return &n
->need_exit
;
408 static void dnet_io_cleanup_states(struct dnet_node
*n
)
410 struct dnet_net_state
*st
, *tmp
;
412 list_for_each_entry_safe(st
, tmp
, &n
->storage_state_list
, storage_state_entry
) {
413 dnet_state_reset(st
);
417 struct dnet_io_process_data
{
422 static void *dnet_io_process_pool(void *data_
)
424 struct dnet_work_io
*wio
= data_
;
425 struct dnet_node
*n
= wio
->n
;
426 struct dnet_net_state
*st
;
427 struct dnet_io
*io
= n
->io
;
430 struct dnet_io_req
*r
;
431 struct list_head
*head
;
434 dnet_log(n
, DNET_LOG_NOTICE
, "Starting %s IO processing thread.\n", wio
->nonblocking
? "nonblocking" : "blocking");
435 dnet_set_name("io_pool");
437 while (!n
->need_exit
) {
441 gettimeofday(&tv
, NULL
);
442 ts
.tv_sec
= tv
.tv_sec
+ 1;
443 ts
.tv_nsec
= tv
.tv_usec
* 1000;
445 pthread_mutex_lock(&io
->recv_lock
);
446 head
= &io
->recv_list
;
448 if (wio
->nonblocking
)
449 head
= &io
->nonblocking_recv_list
;
451 if (!list_empty(head
)) {
452 r
= list_first_entry(head
, struct dnet_io_req
, req_entry
);
454 err
= pthread_cond_timedwait(&io
->recv_wait
, &io
->recv_lock
, &ts
);
455 if (!list_empty(head
)) {
456 r
= list_first_entry(head
, struct dnet_io_req
, req_entry
);
462 list_del_init(&r
->req_entry
);
463 pthread_mutex_unlock(&io
->recv_lock
);
470 dnet_log(n
, DNET_LOG_DSA
, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, nonblocking: %d\n",
471 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
), r
, r
->hsize
, r
->dsize
, wio
->nonblocking
);
473 err
= dnet_process_recv(st
, r
);
479 dnet_log(n
, DNET_LOG_DSA
, "Exiting IO processing thread: need_exit: %d, err: %d.\n", n
->need_exit
, err
);
483 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
487 int io_size
= sizeof(struct dnet_io
) +
488 sizeof(struct dnet_net_io
) * cfg
->net_thread_num
+
489 sizeof(struct dnet_work_io
) * (cfg
->io_thread_num
+ cfg
->nonblocking_io_thread_num
);
491 io
= malloc(io_size
);
497 memset(io
, 0, io_size
);
499 io
->nonblocking_thread_num
= cfg
->nonblocking_io_thread_num
;
500 io
->thread_num
= cfg
->io_thread_num
;
501 io
->net_thread_num
= cfg
->net_thread_num
;
503 io
->net
= (struct dnet_net_io
*)(io
+ 1);
504 io
->wio
= (struct dnet_work_io
*)(io
->net
+ cfg
->net_thread_num
);
506 INIT_LIST_HEAD(&io
->recv_list
);
507 INIT_LIST_HEAD(&io
->nonblocking_recv_list
);
510 err
= pthread_cond_init(&io
->recv_wait
, NULL
);
513 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialize send cond: %d\n", err
);
517 err
= pthread_mutex_init(&io
->recv_lock
, NULL
);
520 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialize send lock: %d\n", err
);
521 goto err_out_recv_cond
;
524 for (i
=0; i
<io
->net_thread_num
; ++i
) {
525 struct dnet_net_io
*nio
= &io
->net
[i
];
529 nio
->epoll_fd
= epoll_create(10000);
530 if (nio
->epoll_fd
< 0) {
532 dnet_log_err(n
, "Failed to create epoll fd");
533 goto err_out_net_destroy
;
536 fcntl(nio
->epoll_fd
, F_SETFD
, FD_CLOEXEC
);
537 fcntl(nio
->epoll_fd
, F_SETFL
, O_NONBLOCK
);
539 err
= pthread_create(&nio
->tid
, NULL
, dnet_io_process
, nio
);
541 close(nio
->epoll_fd
);
543 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create network processing thread: %d\n", err
);
544 goto err_out_net_destroy
;
548 dnet_log(n
, DNET_LOG_INFO
, "Starting %d blocking threads and %d nonblocking threads\n", io
->thread_num
, io
->nonblocking_thread_num
);
549 for (i
=0; i
<io
->thread_num
+ io
->nonblocking_thread_num
; ++i
) {
550 struct dnet_work_io
*wio
= &io
->wio
[i
];
553 wio
->thread_index
= i
;
555 if (i
>= io
->thread_num
)
556 wio
->nonblocking
= 1;
558 err
= pthread_create(&wio
->tid
, NULL
, dnet_io_process_pool
, wio
);
561 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create IO thread: %d\n", err
);
562 goto err_out_io_threads
;
570 pthread_join(io
->wio
[i
].tid
, NULL
);
572 i
= io
->net_thread_num
;
575 pthread_join(io
->net
[i
].tid
, NULL
);
576 close(io
->net
[i
].epoll_fd
);
579 pthread_mutex_destroy(&io
->recv_lock
);
581 pthread_cond_destroy(&io
->recv_wait
);
588 void dnet_io_exit(struct dnet_node
*n
)
590 struct dnet_io
*io
= n
->io
;
591 struct dnet_io_req
*r
, *tmp
;
596 for (i
=0; i
<io
->thread_num
+ io
->nonblocking_thread_num
; ++i
)
597 pthread_join(io
->wio
[i
].tid
, NULL
);
599 for (i
=0; i
<io
->net_thread_num
; ++i
) {
600 pthread_join(io
->net
[i
].tid
, NULL
);
601 close(io
->net
[i
].epoll_fd
);
604 dnet_io_cleanup_states(n
);
606 list_for_each_entry_safe(r
, tmp
, &io
->recv_list
, req_entry
) {
607 list_del(&r
->req_entry
);
611 list_for_each_entry_safe(r
, tmp
, &io
->nonblocking_recv_list
, req_entry
) {
612 list_del(&r
->req_entry
);
616 pthread_mutex_destroy(&io
->recv_lock
);
617 pthread_cond_destroy(&io
->recv_wait
);