More recv debug
[elliptics.git] / library / pool.c
blob197481fc3cc9913e90f5441e3a3deb667e8c0075
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include "config.h"
18 #include <sys/stat.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24 #include <signal.h>
26 #include "elliptics.h"
27 #include "elliptics/interface.h"
29 static void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r)
31 struct dnet_io *io = n->io;
32 struct dnet_cmd *cmd = r->header;
33 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
35 if (cmd->size >= sizeof(struct dnet_attr)) {
36 struct dnet_attr *attr = r->header + sizeof(struct dnet_cmd);
37 dnet_log(r->st->n, DNET_LOG_DSA, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
38 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(attr->cmd),
39 (unsigned long long)cmd->size, nonblocking);
40 } else if ((cmd->size == 0) && !(cmd->flags & DNET_FLAGS_MORE) && (cmd->trans & DNET_TRANS_REPLY)) {
41 dnet_log(r->st->n, DNET_LOG_DSA, "%s: %s: RECV ACK: nonblocking: %d\n",
42 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), nonblocking);
43 } else {
44 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
45 int reply = !!(cmd->trans & DNET_TRANS_REPLY);
47 dnet_log(r->st->n, DNET_LOG_DSA, "%s: %s: RECV: nonblocking: %d, cmd-size: %llu, cmd-flags: %x, cmd-trans: %lld, reply: %d\n",
48 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), nonblocking,
49 (unsigned long long)cmd->size, cmd->flags, tid, reply);
52 pthread_mutex_lock(&io->recv_lock);
54 if (nonblocking)
55 list_add_tail(&r->req_entry, &io->nonblocking_recv_list);
56 else
57 list_add_tail(&r->req_entry, &io->recv_list);
59 pthread_cond_broadcast(&io->recv_wait);
60 pthread_mutex_unlock(&io->recv_lock);
63 void dnet_schedule_command(struct dnet_net_state *st)
65 st->rcv_flags = DNET_IO_CMD;
67 if (st->rcv_data) {
68 #if 0
69 struct dnet_cmd *c = &st->rcv_cmd;
70 unsigned long long tid = c->trans & ~DNET_TRANS_REPLY;
71 dnet_log(st->n, DNET_LOG_DSA, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
72 (unsigned long long)c->size, tid, tid != c->trans, st->rcv_data);
73 #endif
74 free(st->rcv_data);
75 st->rcv_data = NULL;
78 st->rcv_end = sizeof(struct dnet_cmd);
79 st->rcv_offset = 0;
82 static int dnet_process_recv_single(struct dnet_net_state *st)
84 struct dnet_node *n = st->n;
85 struct dnet_io_req *r;
86 void *data;
87 uint64_t size;
88 int err;
90 again:
92 * Reading command first.
94 if (st->rcv_flags & DNET_IO_CMD)
95 data = &st->rcv_cmd;
96 else
97 data = st->rcv_data;
98 data += st->rcv_offset;
99 size = st->rcv_end - st->rcv_offset;
101 if (size) {
102 err = recv(st->read_s, data, size, 0);
103 if (err < 0) {
104 err = -EAGAIN;
105 if (errno != EAGAIN && errno != EINTR) {
106 err = -errno;
107 dnet_log_err(n, "failed to receive data, socket: %d", st->read_s);
108 goto out;
111 goto out;
114 if (err == 0) {
115 dnet_log(n, DNET_LOG_ERROR, "Peer %s has disconnected.\n",
116 dnet_server_convert_dnet_addr(&st->addr));
117 err = -ECONNRESET;
118 goto out;
121 st->rcv_offset += err;
124 if (st->rcv_offset != st->rcv_end)
125 goto again;
127 if (st->rcv_flags & DNET_IO_CMD) {
128 unsigned long long tid;
129 struct dnet_cmd *c = &st->rcv_cmd;
131 dnet_convert_cmd(c);
133 tid = c->trans & ~DNET_TRANS_REPLY;
135 dnet_log(n, DNET_LOG_DSA, "%s: received trans: %llu / %llx, "
136 "reply: %d, size: %llu, flags: %x, status: %d.\n",
137 dnet_dump_id(&c->id), tid, (unsigned long long)c->trans,
138 !!(c->trans & DNET_TRANS_REPLY),
139 (unsigned long long)c->size, c->flags, c->status);
141 r = malloc(c->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_req));
142 if (!r) {
143 err = -ENOMEM;
144 goto out;
146 memset(r, 0, sizeof(struct dnet_io_req));
148 r->header = r + 1;
149 r->hsize = sizeof(struct dnet_cmd);
150 memcpy(r->header, &st->rcv_cmd, sizeof(struct dnet_cmd));
152 st->rcv_data = r;
153 st->rcv_offset = sizeof(struct dnet_io_req) + sizeof(struct dnet_cmd);
154 st->rcv_end = st->rcv_offset + c->size;
155 st->rcv_flags &= ~DNET_IO_CMD;
157 if (c->size) {
158 r->data = r->header + sizeof(struct dnet_cmd);
159 r->dsize = c->size;
162 * We read the command header, now get the data.
164 goto again;
168 r = st->rcv_data;
169 st->rcv_data = NULL;
171 dnet_schedule_command(st);
173 r->st = dnet_state_get(st);
175 dnet_schedule_io(n, r);
176 return 0;
178 out:
179 if (err != -EAGAIN && err != -EINTR)
180 dnet_schedule_command(st);
182 return err;
185 int dnet_state_accept_process(struct dnet_net_state *orig, struct epoll_event *ev __unused)
187 struct dnet_node *n = orig->n;
188 int err, cs;
189 struct dnet_addr addr;
190 struct dnet_net_state *st;
192 memset(&addr, 0, sizeof(addr));
194 addr.addr_len = sizeof(addr.addr);
195 cs = accept(orig->read_s, (struct sockaddr *)&addr.addr, &addr.addr_len);
196 if (cs <= 0) {
197 err = -errno;
198 if (err != -EAGAIN)
199 dnet_log_err(n, "failed to accept new client at %s", dnet_state_dump_addr(orig));
200 goto err_out_exit;
203 dnet_set_sockopt(cs);
205 st = dnet_state_create(n, 0, NULL, 0, &addr, cs, &err, 0, dnet_state_net_process);
206 if (!st) {
207 dnet_log(n, DNET_LOG_ERROR, "%s: Failed to create state for accepted client: %s [%d]\n",
208 dnet_server_convert_dnet_addr(&addr), strerror(-err), -err);
209 err = -EAGAIN;
210 goto err_out_exit;
213 dnet_log(n, DNET_LOG_INFO, "Accepted client %s, socket: %d.\n",
214 dnet_server_convert_dnet_addr(&addr), cs);
216 return 0;
217 /* socket is closed in dnet_state_create() */
218 err_out_exit:
219 return err;
222 void dnet_unschedule_send(struct dnet_net_state *st)
224 struct epoll_event ev;
226 ev.events = EPOLLOUT;
227 ev.data.ptr = st;
229 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->write_s, &ev);
232 void dnet_unschedule_recv(struct dnet_net_state *st)
234 struct epoll_event ev;
236 ev.events = EPOLLIN;
237 ev.data.ptr = st;
239 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->read_s, &ev);
242 static int dnet_process_send_single(struct dnet_net_state *st)
244 struct dnet_io_req *r = NULL;
245 int err;
247 while (1) {
248 r = NULL;
250 pthread_mutex_lock(&st->send_lock);
251 if (!list_empty(&st->send_list)) {
252 r = list_first_entry(&st->send_list, struct dnet_io_req, req_entry);
253 } else {
254 dnet_unschedule_send(st);
256 pthread_mutex_unlock(&st->send_lock);
258 if (!r) {
259 err = -EAGAIN;
260 goto err_out_exit;
263 err = dnet_send_request(st, r);
264 if (err)
265 goto err_out_exit;
268 err_out_exit:
269 return err;
272 static int dnet_schedule_network_io(struct dnet_net_state *st, int send)
274 struct epoll_event ev;
275 int err, fd;
277 if (send) {
278 ev.events = EPOLLOUT;
279 fd = st->write_s;
280 } else {
281 ev.events = EPOLLIN;
282 fd = st->read_s;
284 ev.data.ptr = st;
286 err = epoll_ctl(st->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
287 if (err < 0) {
288 err = -errno;
290 if (err == -EEXIST) {
291 err = 0;
292 } else {
293 dnet_log_err(st->n, "%s: failed to add %s event", dnet_state_dump_addr(st), send ? "SEND" : "RECV");
297 return err;
300 int dnet_schedule_send(struct dnet_net_state *st)
302 return dnet_schedule_network_io(st, 1);
305 int dnet_schedule_recv(struct dnet_net_state *st)
307 return dnet_schedule_network_io(st, 0);
310 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev)
312 int err = -ECONNRESET;
314 if (ev->events & EPOLLIN) {
315 err = dnet_process_recv_single(st);
316 if (err && (err != -EAGAIN))
317 goto err_out_exit;
319 if (ev->events & EPOLLOUT) {
320 err = dnet_process_send_single(st);
321 if (err && (err != -EAGAIN))
322 goto err_out_exit;
325 if (ev->events & (EPOLLHUP | EPOLLERR)) {
326 dnet_log(st->n, DNET_LOG_ERROR, "%s: received error event mask %x\n", dnet_state_dump_addr(st), ev->events);
327 err = -ECONNRESET;
329 err_out_exit:
330 return err;
333 static void *dnet_io_process(void *data_)
335 struct dnet_net_io *nio = data_;
336 struct dnet_node *n = nio->n;
337 struct dnet_net_state *st;
338 struct epoll_event ev;
339 int err = 0, check;
340 struct dnet_trans *t, *tmp;
341 struct timeval tv;
342 struct list_head head;
344 dnet_set_name("net_pool");
345 dnet_log(n, DNET_LOG_NOTICE, "Starting network processing thread.\n");
347 while (!n->need_exit) {
348 err = epoll_wait(nio->epoll_fd, &ev, 1, 1000);
349 if (err == 0)
350 continue;
352 if (err < 0) {
353 err = -errno;
355 if (err == -EAGAIN || err == -EINTR)
356 continue;
358 dnet_log_err(n, "Failed to wait for IO fds");
359 n->need_exit = err;
360 break;
363 st = ev.data.ptr;
364 st->epoll_fd = nio->epoll_fd;
365 check = st->stall;
367 while (1) {
368 err = st->process(st, &ev);
369 if (err == 0)
370 continue;
372 if (err == -EAGAIN && st->stall < DNET_DEFAULT_STALL_TRANSACTIONS)
373 break;
375 if (err < 0 || st->stall >= DNET_DEFAULT_STALL_TRANSACTIONS) {
376 dnet_state_reset(st);
377 check = 0;
378 break;
382 if (!check)
383 continue;
385 gettimeofday(&tv, NULL);
387 INIT_LIST_HEAD(&head);
389 pthread_mutex_lock(&st->trans_lock);
390 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_list_entry) {
391 if (t->time.tv_sec >= tv.tv_sec)
392 break;
394 dnet_trans_remove_nolock(&st->trans_root, t);
395 list_move(&t->trans_list_entry, &head);
397 pthread_mutex_unlock(&st->trans_lock);
399 list_for_each_entry_safe(t, tmp, &head, trans_list_entry) {
400 list_del_init(&t->trans_list_entry);
402 t->cmd.flags = 0;
403 t->cmd.size = 0;
404 t->cmd.status = -ETIMEDOUT;
406 dnet_log(st->n, DNET_LOG_ERROR, "%s: destructing trans: %llu on TIMEOUT\n",
407 dnet_state_dump_addr(st), (unsigned long long)t->trans);
409 if (t->complete)
410 t->complete(st, &t->cmd, NULL, t->priv);
412 dnet_trans_put(t);
416 dnet_log(n, DNET_LOG_NOTICE, "Exiting network processing thread: need_exit: %d, err: %d.\n", n->need_exit, err);
417 return &n->need_exit;
420 static void dnet_io_cleanup_states(struct dnet_node *n)
422 struct dnet_net_state *st, *tmp;
424 list_for_each_entry_safe(st, tmp, &n->storage_state_list, storage_state_entry) {
425 dnet_state_reset(st);
429 struct dnet_io_process_data {
430 struct dnet_node *n;
431 int thread_number;
434 static void *dnet_io_process_pool(void *data_)
436 struct dnet_work_io *wio = data_;
437 struct dnet_node *n = wio->n;
438 struct dnet_net_state *st;
439 struct dnet_io *io = n->io;
440 struct timespec ts;
441 struct timeval tv;
442 struct dnet_io_req *r;
443 struct list_head *head;
444 int err = 0;
446 dnet_log(n, DNET_LOG_NOTICE, "Starting %s IO processing thread.\n", wio->nonblocking ? "nonblocking" : "blocking");
447 dnet_set_name("io_pool");
449 while (!n->need_exit) {
450 r = NULL;
451 err = 0;
453 gettimeofday(&tv, NULL);
454 ts.tv_sec = tv.tv_sec + 1;
455 ts.tv_nsec = tv.tv_usec * 1000;
457 pthread_mutex_lock(&io->recv_lock);
458 head = &io->recv_list;
460 if (wio->nonblocking)
461 head = &io->nonblocking_recv_list;
463 if (!list_empty(head)) {
464 r = list_first_entry(head, struct dnet_io_req, req_entry);
465 } else {
466 err = pthread_cond_timedwait(&io->recv_wait, &io->recv_lock, &ts);
467 if (!list_empty(head)) {
468 r = list_first_entry(head, struct dnet_io_req, req_entry);
469 err = 0;
473 if (r)
474 list_del_init(&r->req_entry);
475 pthread_mutex_unlock(&io->recv_lock);
477 if (!r)
478 continue;
480 st = r->st;
482 dnet_log(n, DNET_LOG_DSA, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, nonblocking: %d\n",
483 dnet_state_dump_addr(st), dnet_dump_id(r->header), r, r->hsize, r->dsize, wio->nonblocking);
485 err = dnet_process_recv(st, r);
487 dnet_io_req_free(r);
488 dnet_state_put(st);
491 dnet_log(n, DNET_LOG_DSA, "Exiting IO processing thread: need_exit: %d, err: %d.\n", n->need_exit, err);
492 return NULL;
495 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg)
497 int err, i;
498 struct dnet_io *io;
499 int io_size = sizeof(struct dnet_io) +
500 sizeof(struct dnet_net_io) * cfg->net_thread_num +
501 sizeof(struct dnet_work_io) * (cfg->io_thread_num + cfg->nonblocking_io_thread_num);
503 io = malloc(io_size);
504 if (!io) {
505 err = -ENOMEM;
506 goto err_out_exit;
509 memset(io, 0, io_size);
511 io->nonblocking_thread_num = cfg->nonblocking_io_thread_num;
512 io->thread_num = cfg->io_thread_num;
513 io->net_thread_num = cfg->net_thread_num;
515 io->net = (struct dnet_net_io *)(io + 1);
516 io->wio = (struct dnet_work_io *)(io->net + cfg->net_thread_num);
518 INIT_LIST_HEAD(&io->recv_list);
519 INIT_LIST_HEAD(&io->nonblocking_recv_list);
520 n->io = io;
522 err = pthread_cond_init(&io->recv_wait, NULL);
523 if (err) {
524 err = -err;
525 dnet_log(n, DNET_LOG_ERROR, "Failed to initialize send cond: %d\n", err);
526 goto err_out_free;
529 err = pthread_mutex_init(&io->recv_lock, NULL);
530 if (err) {
531 err = -err;
532 dnet_log(n, DNET_LOG_ERROR, "Failed to initialize send lock: %d\n", err);
533 goto err_out_recv_cond;
536 for (i=0; i<io->net_thread_num; ++i) {
537 struct dnet_net_io *nio = &io->net[i];
539 nio->n = n;
541 nio->epoll_fd = epoll_create(10000);
542 if (nio->epoll_fd < 0) {
543 err = -errno;
544 dnet_log_err(n, "Failed to create epoll fd");
545 goto err_out_net_destroy;
548 fcntl(nio->epoll_fd, F_SETFD, FD_CLOEXEC);
549 fcntl(nio->epoll_fd, F_SETFL, O_NONBLOCK);
551 err = pthread_create(&nio->tid, NULL, dnet_io_process, nio);
552 if (err) {
553 close(nio->epoll_fd);
554 err = -err;
555 dnet_log(n, DNET_LOG_ERROR, "Failed to create network processing thread: %d\n", err);
556 goto err_out_net_destroy;
560 dnet_log(n, DNET_LOG_INFO, "Starting %d blocking threads and %d nonblocking threads\n", io->thread_num, io->nonblocking_thread_num);
561 for (i=0; i<io->thread_num + io->nonblocking_thread_num; ++i) {
562 struct dnet_work_io *wio = &io->wio[i];
564 wio->n = n;
565 wio->thread_index = i;
567 if (i >= io->thread_num)
568 wio->nonblocking = 1;
570 err = pthread_create(&wio->tid, NULL, dnet_io_process_pool, wio);
571 if (err) {
572 err = -err;
573 dnet_log(n, DNET_LOG_ERROR, "Failed to create IO thread: %d\n", err);
574 goto err_out_io_threads;
578 return 0;
580 err_out_io_threads:
581 while (--i >= 0)
582 pthread_join(io->wio[i].tid, NULL);
584 i = io->net_thread_num;
585 err_out_net_destroy:
586 while (--i >= 0) {
587 pthread_join(io->net[i].tid, NULL);
588 close(io->net[i].epoll_fd);
591 pthread_mutex_destroy(&io->recv_lock);
592 err_out_recv_cond:
593 pthread_cond_destroy(&io->recv_wait);
594 err_out_free:
595 free(io);
596 err_out_exit:
597 return err;
600 void dnet_io_exit(struct dnet_node *n)
602 struct dnet_io *io = n->io;
603 struct dnet_io_req *r, *tmp;
604 int i;
606 n->need_exit = 1;
608 for (i=0; i<io->thread_num + io->nonblocking_thread_num; ++i)
609 pthread_join(io->wio[i].tid, NULL);
611 for (i=0; i<io->net_thread_num; ++i) {
612 pthread_join(io->net[i].tid, NULL);
613 close(io->net[i].epoll_fd);
616 dnet_io_cleanup_states(n);
618 list_for_each_entry_safe(r, tmp, &io->recv_list, req_entry) {
619 list_del(&r->req_entry);
620 dnet_io_req_free(r);
623 list_for_each_entry_safe(r, tmp, &io->nonblocking_recv_list, req_entry) {
624 list_del(&r->req_entry);
625 dnet_io_req_free(r);
628 pthread_mutex_destroy(&io->recv_lock);
629 pthread_cond_destroy(&io->recv_wait);
631 free(io);