2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
26 #include "elliptics.h"
27 #include "elliptics/interface.h"
29 static void dnet_schedule_io(struct dnet_node
*n
, struct dnet_io_req
*r
)
31 struct dnet_io
*io
= n
->io
;
32 struct dnet_cmd
*cmd
= r
->header
;
33 int nonblocking
= !!(cmd
->flags
& DNET_FLAGS_NOLOCK
);
35 if (cmd
->size
>= sizeof(struct dnet_attr
)) {
36 struct dnet_attr
*attr
= r
->header
+ sizeof(struct dnet_cmd
);
37 dnet_log(r
->st
->n
, DNET_LOG_DSA
, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
38 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), dnet_cmd_string(attr
->cmd
),
39 (unsigned long long)cmd
->size
, nonblocking
);
40 } else if ((cmd
->size
== 0) && !(cmd
->flags
& DNET_FLAGS_MORE
) && (cmd
->trans
& DNET_TRANS_REPLY
)) {
41 dnet_log(r
->st
->n
, DNET_LOG_DSA
, "%s: %s: RECV ACK: nonblocking: %d\n",
42 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), nonblocking
);
44 unsigned long long tid
= cmd
->trans
& ~DNET_TRANS_REPLY
;
45 int reply
= !!(cmd
->trans
& DNET_TRANS_REPLY
);
47 dnet_log(r
->st
->n
, DNET_LOG_DSA
, "%s: %s: RECV: nonblocking: %d, cmd-size: %llu, cmd-flags: %x, cmd-trans: %lld, reply: %d\n",
48 dnet_state_dump_addr(r
->st
), dnet_dump_id(r
->header
), nonblocking
,
49 (unsigned long long)cmd
->size
, cmd
->flags
, tid
, reply
);
52 pthread_mutex_lock(&io
->recv_lock
);
55 list_add_tail(&r
->req_entry
, &io
->nonblocking_recv_list
);
57 list_add_tail(&r
->req_entry
, &io
->recv_list
);
59 pthread_cond_broadcast(&io
->recv_wait
);
60 pthread_mutex_unlock(&io
->recv_lock
);
63 void dnet_schedule_command(struct dnet_net_state
*st
)
65 st
->rcv_flags
= DNET_IO_CMD
;
69 struct dnet_cmd
*c
= &st
->rcv_cmd
;
70 unsigned long long tid
= c
->trans
& ~DNET_TRANS_REPLY
;
71 dnet_log(st
->n
, DNET_LOG_DSA
, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
72 (unsigned long long)c
->size
, tid
, tid
!= c
->trans
, st
->rcv_data
);
78 st
->rcv_end
= sizeof(struct dnet_cmd
);
82 static int dnet_process_recv_single(struct dnet_net_state
*st
)
84 struct dnet_node
*n
= st
->n
;
85 struct dnet_io_req
*r
;
92 * Reading command first.
94 if (st
->rcv_flags
& DNET_IO_CMD
)
98 data
+= st
->rcv_offset
;
99 size
= st
->rcv_end
- st
->rcv_offset
;
102 err
= recv(st
->read_s
, data
, size
, 0);
105 if (errno
!= EAGAIN
&& errno
!= EINTR
) {
107 dnet_log_err(n
, "failed to receive data, socket: %d", st
->read_s
);
115 dnet_log(n
, DNET_LOG_ERROR
, "Peer %s has disconnected.\n",
116 dnet_server_convert_dnet_addr(&st
->addr
));
121 st
->rcv_offset
+= err
;
124 if (st
->rcv_offset
!= st
->rcv_end
)
127 if (st
->rcv_flags
& DNET_IO_CMD
) {
128 unsigned long long tid
;
129 struct dnet_cmd
*c
= &st
->rcv_cmd
;
133 tid
= c
->trans
& ~DNET_TRANS_REPLY
;
135 dnet_log(n
, DNET_LOG_DSA
, "%s: received trans: %llu / %llx, "
136 "reply: %d, size: %llu, flags: %x, status: %d.\n",
137 dnet_dump_id(&c
->id
), tid
, (unsigned long long)c
->trans
,
138 !!(c
->trans
& DNET_TRANS_REPLY
),
139 (unsigned long long)c
->size
, c
->flags
, c
->status
);
141 r
= malloc(c
->size
+ sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_req
));
146 memset(r
, 0, sizeof(struct dnet_io_req
));
149 r
->hsize
= sizeof(struct dnet_cmd
);
150 memcpy(r
->header
, &st
->rcv_cmd
, sizeof(struct dnet_cmd
));
153 st
->rcv_offset
= sizeof(struct dnet_io_req
) + sizeof(struct dnet_cmd
);
154 st
->rcv_end
= st
->rcv_offset
+ c
->size
;
155 st
->rcv_flags
&= ~DNET_IO_CMD
;
158 r
->data
= r
->header
+ sizeof(struct dnet_cmd
);
162 * We read the command header, now get the data.
171 dnet_schedule_command(st
);
173 r
->st
= dnet_state_get(st
);
175 dnet_schedule_io(n
, r
);
179 if (err
!= -EAGAIN
&& err
!= -EINTR
)
180 dnet_schedule_command(st
);
185 int dnet_state_accept_process(struct dnet_net_state
*orig
, struct epoll_event
*ev __unused
)
187 struct dnet_node
*n
= orig
->n
;
189 struct dnet_addr addr
;
190 struct dnet_net_state
*st
;
192 memset(&addr
, 0, sizeof(addr
));
194 addr
.addr_len
= sizeof(addr
.addr
);
195 cs
= accept(orig
->read_s
, (struct sockaddr
*)&addr
.addr
, &addr
.addr_len
);
199 dnet_log_err(n
, "failed to accept new client at %s", dnet_state_dump_addr(orig
));
203 dnet_set_sockopt(cs
);
205 st
= dnet_state_create(n
, 0, NULL
, 0, &addr
, cs
, &err
, 0, dnet_state_net_process
);
207 dnet_log(n
, DNET_LOG_ERROR
, "%s: Failed to create state for accepted client: %s [%d]\n",
208 dnet_server_convert_dnet_addr(&addr
), strerror(-err
), -err
);
213 dnet_log(n
, DNET_LOG_INFO
, "Accepted client %s, socket: %d.\n",
214 dnet_server_convert_dnet_addr(&addr
), cs
);
217 /* socket is closed in dnet_state_create() */
222 void dnet_unschedule_send(struct dnet_net_state
*st
)
224 struct epoll_event ev
;
226 ev
.events
= EPOLLOUT
;
229 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->write_s
, &ev
);
232 void dnet_unschedule_recv(struct dnet_net_state
*st
)
234 struct epoll_event ev
;
239 epoll_ctl(st
->epoll_fd
, EPOLL_CTL_DEL
, st
->read_s
, &ev
);
242 static int dnet_process_send_single(struct dnet_net_state
*st
)
244 struct dnet_io_req
*r
= NULL
;
250 pthread_mutex_lock(&st
->send_lock
);
251 if (!list_empty(&st
->send_list
)) {
252 r
= list_first_entry(&st
->send_list
, struct dnet_io_req
, req_entry
);
254 dnet_unschedule_send(st
);
256 pthread_mutex_unlock(&st
->send_lock
);
263 err
= dnet_send_request(st
, r
);
272 static int dnet_schedule_network_io(struct dnet_net_state
*st
, int send
)
274 struct epoll_event ev
;
278 ev
.events
= EPOLLOUT
;
286 err
= epoll_ctl(st
->epoll_fd
, EPOLL_CTL_ADD
, fd
, &ev
);
290 if (err
== -EEXIST
) {
293 dnet_log_err(st
->n
, "%s: failed to add %s event", dnet_state_dump_addr(st
), send
? "SEND" : "RECV");
300 int dnet_schedule_send(struct dnet_net_state
*st
)
302 return dnet_schedule_network_io(st
, 1);
305 int dnet_schedule_recv(struct dnet_net_state
*st
)
307 return dnet_schedule_network_io(st
, 0);
310 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
)
312 int err
= -ECONNRESET
;
314 if (ev
->events
& EPOLLIN
) {
315 err
= dnet_process_recv_single(st
);
316 if (err
&& (err
!= -EAGAIN
))
319 if (ev
->events
& EPOLLOUT
) {
320 err
= dnet_process_send_single(st
);
321 if (err
&& (err
!= -EAGAIN
))
325 if (ev
->events
& (EPOLLHUP
| EPOLLERR
)) {
326 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: received error event mask %x\n", dnet_state_dump_addr(st
), ev
->events
);
333 static void *dnet_io_process(void *data_
)
335 struct dnet_net_io
*nio
= data_
;
336 struct dnet_node
*n
= nio
->n
;
337 struct dnet_net_state
*st
;
338 struct epoll_event ev
;
340 struct dnet_trans
*t
, *tmp
;
342 struct list_head head
;
344 dnet_set_name("net_pool");
345 dnet_log(n
, DNET_LOG_NOTICE
, "Starting network processing thread.\n");
347 while (!n
->need_exit
) {
348 err
= epoll_wait(nio
->epoll_fd
, &ev
, 1, 1000);
355 if (err
== -EAGAIN
|| err
== -EINTR
)
358 dnet_log_err(n
, "Failed to wait for IO fds");
364 st
->epoll_fd
= nio
->epoll_fd
;
368 err
= st
->process(st
, &ev
);
372 if (err
== -EAGAIN
&& st
->stall
< DNET_DEFAULT_STALL_TRANSACTIONS
)
375 if (err
< 0 || st
->stall
>= DNET_DEFAULT_STALL_TRANSACTIONS
) {
376 dnet_state_reset(st
);
385 gettimeofday(&tv
, NULL
);
387 INIT_LIST_HEAD(&head
);
389 pthread_mutex_lock(&st
->trans_lock
);
390 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_list_entry
) {
391 if (t
->time
.tv_sec
>= tv
.tv_sec
)
394 dnet_trans_remove_nolock(&st
->trans_root
, t
);
395 list_move(&t
->trans_list_entry
, &head
);
397 pthread_mutex_unlock(&st
->trans_lock
);
399 list_for_each_entry_safe(t
, tmp
, &head
, trans_list_entry
) {
400 list_del_init(&t
->trans_list_entry
);
404 t
->cmd
.status
= -ETIMEDOUT
;
406 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: destructing trans: %llu on TIMEOUT\n",
407 dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
410 t
->complete(st
, &t
->cmd
, NULL
, t
->priv
);
416 dnet_log(n
, DNET_LOG_NOTICE
, "Exiting network processing thread: need_exit: %d, err: %d.\n", n
->need_exit
, err
);
417 return &n
->need_exit
;
420 static void dnet_io_cleanup_states(struct dnet_node
*n
)
422 struct dnet_net_state
*st
, *tmp
;
424 list_for_each_entry_safe(st
, tmp
, &n
->storage_state_list
, storage_state_entry
) {
425 dnet_state_reset(st
);
429 struct dnet_io_process_data
{
434 static void *dnet_io_process_pool(void *data_
)
436 struct dnet_work_io
*wio
= data_
;
437 struct dnet_node
*n
= wio
->n
;
438 struct dnet_net_state
*st
;
439 struct dnet_io
*io
= n
->io
;
442 struct dnet_io_req
*r
;
443 struct list_head
*head
;
446 dnet_log(n
, DNET_LOG_NOTICE
, "Starting %s IO processing thread.\n", wio
->nonblocking
? "nonblocking" : "blocking");
447 dnet_set_name("io_pool");
449 while (!n
->need_exit
) {
453 gettimeofday(&tv
, NULL
);
454 ts
.tv_sec
= tv
.tv_sec
+ 1;
455 ts
.tv_nsec
= tv
.tv_usec
* 1000;
457 pthread_mutex_lock(&io
->recv_lock
);
458 head
= &io
->recv_list
;
460 if (wio
->nonblocking
)
461 head
= &io
->nonblocking_recv_list
;
463 if (!list_empty(head
)) {
464 r
= list_first_entry(head
, struct dnet_io_req
, req_entry
);
466 err
= pthread_cond_timedwait(&io
->recv_wait
, &io
->recv_lock
, &ts
);
467 if (!list_empty(head
)) {
468 r
= list_first_entry(head
, struct dnet_io_req
, req_entry
);
474 list_del_init(&r
->req_entry
);
475 pthread_mutex_unlock(&io
->recv_lock
);
482 dnet_log(n
, DNET_LOG_DSA
, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, nonblocking: %d\n",
483 dnet_state_dump_addr(st
), dnet_dump_id(r
->header
), r
, r
->hsize
, r
->dsize
, wio
->nonblocking
);
485 err
= dnet_process_recv(st
, r
);
491 dnet_log(n
, DNET_LOG_DSA
, "Exiting IO processing thread: need_exit: %d, err: %d.\n", n
->need_exit
, err
);
495 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
499 int io_size
= sizeof(struct dnet_io
) +
500 sizeof(struct dnet_net_io
) * cfg
->net_thread_num
+
501 sizeof(struct dnet_work_io
) * (cfg
->io_thread_num
+ cfg
->nonblocking_io_thread_num
);
503 io
= malloc(io_size
);
509 memset(io
, 0, io_size
);
511 io
->nonblocking_thread_num
= cfg
->nonblocking_io_thread_num
;
512 io
->thread_num
= cfg
->io_thread_num
;
513 io
->net_thread_num
= cfg
->net_thread_num
;
515 io
->net
= (struct dnet_net_io
*)(io
+ 1);
516 io
->wio
= (struct dnet_work_io
*)(io
->net
+ cfg
->net_thread_num
);
518 INIT_LIST_HEAD(&io
->recv_list
);
519 INIT_LIST_HEAD(&io
->nonblocking_recv_list
);
522 err
= pthread_cond_init(&io
->recv_wait
, NULL
);
525 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialize send cond: %d\n", err
);
529 err
= pthread_mutex_init(&io
->recv_lock
, NULL
);
532 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialize send lock: %d\n", err
);
533 goto err_out_recv_cond
;
536 for (i
=0; i
<io
->net_thread_num
; ++i
) {
537 struct dnet_net_io
*nio
= &io
->net
[i
];
541 nio
->epoll_fd
= epoll_create(10000);
542 if (nio
->epoll_fd
< 0) {
544 dnet_log_err(n
, "Failed to create epoll fd");
545 goto err_out_net_destroy
;
548 fcntl(nio
->epoll_fd
, F_SETFD
, FD_CLOEXEC
);
549 fcntl(nio
->epoll_fd
, F_SETFL
, O_NONBLOCK
);
551 err
= pthread_create(&nio
->tid
, NULL
, dnet_io_process
, nio
);
553 close(nio
->epoll_fd
);
555 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create network processing thread: %d\n", err
);
556 goto err_out_net_destroy
;
560 dnet_log(n
, DNET_LOG_INFO
, "Starting %d blocking threads and %d nonblocking threads\n", io
->thread_num
, io
->nonblocking_thread_num
);
561 for (i
=0; i
<io
->thread_num
+ io
->nonblocking_thread_num
; ++i
) {
562 struct dnet_work_io
*wio
= &io
->wio
[i
];
565 wio
->thread_index
= i
;
567 if (i
>= io
->thread_num
)
568 wio
->nonblocking
= 1;
570 err
= pthread_create(&wio
->tid
, NULL
, dnet_io_process_pool
, wio
);
573 dnet_log(n
, DNET_LOG_ERROR
, "Failed to create IO thread: %d\n", err
);
574 goto err_out_io_threads
;
582 pthread_join(io
->wio
[i
].tid
, NULL
);
584 i
= io
->net_thread_num
;
587 pthread_join(io
->net
[i
].tid
, NULL
);
588 close(io
->net
[i
].epoll_fd
);
591 pthread_mutex_destroy(&io
->recv_lock
);
593 pthread_cond_destroy(&io
->recv_wait
);
600 void dnet_io_exit(struct dnet_node
*n
)
602 struct dnet_io
*io
= n
->io
;
603 struct dnet_io_req
*r
, *tmp
;
608 for (i
=0; i
<io
->thread_num
+ io
->nonblocking_thread_num
; ++i
)
609 pthread_join(io
->wio
[i
].tid
, NULL
);
611 for (i
=0; i
<io
->net_thread_num
; ++i
) {
612 pthread_join(io
->net
[i
].tid
, NULL
);
613 close(io
->net
[i
].epoll_fd
);
616 dnet_io_cleanup_states(n
);
618 list_for_each_entry_safe(r
, tmp
, &io
->recv_list
, req_entry
) {
619 list_del(&r
->req_entry
);
623 list_for_each_entry_safe(r
, tmp
, &io
->nonblocking_recv_list
, req_entry
) {
624 list_del(&r
->req_entry
);
628 pthread_mutex_destroy(&io
->recv_lock
);
629 pthread_cond_destroy(&io
->recv_wait
);