dnet_remove_object_raw() should return error if it failed and dnet_trans_create_send_...
[elliptics.git] / library / pool.c
bloba09bbbd615f491aff6c354636a040e9432bfac1a
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string[] = {
28 [DNET_WORK_IO_MODE_BLOCKING] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode)
34 if (mode < 0 || mode >= (int)ARRAY_SIZE(dnet_work_io_mode_string))
35 return NULL;
37 return dnet_work_io_mode_string[mode];
40 static void dnet_work_pool_cleanup(struct dnet_work_pool *pool)
42 struct dnet_io_req *r, *tmp;
43 struct dnet_work_io *wio, *wio_tmp;
45 list_for_each_entry_safe(wio, wio_tmp, &pool->wio_list, wio_entry) {
46 pthread_join(wio->tid, NULL);
47 list_del(&wio->wio_entry);
48 free(wio);
52 list_for_each_entry_safe(r, tmp, &pool->list, req_entry) {
53 list_del(&r->req_entry);
54 dnet_io_req_free(r);
57 pthread_cond_destroy(&pool->wait);
58 pthread_mutex_destroy(&pool->lock);
59 free(pool);
62 static int dnet_work_pool_grow(struct dnet_node *n, struct dnet_work_pool *pool, int num, void *(* process)(void *))
64 int i, err;
65 struct dnet_work_io *wio, *tmp;
67 pthread_mutex_lock(&pool->lock);
69 for (i = 0; i < num; ++i) {
70 wio = malloc(sizeof(struct dnet_work_io));
71 if (!wio) {
72 err = -ENOMEM;
73 goto err_out_io_threads;
76 wio->thread_index = i;
77 wio->pool = pool;
78 list_add_tail(&wio->wio_entry, &pool->wio_list);
80 err = pthread_create(&wio->tid, NULL, process, wio);
81 if (err) {
82 err = -err;
83 dnet_log(n, DNET_LOG_ERROR, "Failed to create IO thread: %d\n", err);
84 goto err_out_io_threads;
88 dnet_log(n, DNET_LOG_INFO, "Grew %s pool by: %d -> %d IO threads\n",
89 dnet_work_io_mode_str(pool->mode), pool->num, pool->num + num);
91 atomic_add(&pool->avail, num);
92 pool->num += num;
93 pthread_mutex_unlock(&pool->lock);
95 return 0;
97 err_out_io_threads:
98 list_for_each_entry_safe(wio, tmp, &pool->wio_list, wio_entry) {
99 pthread_join(wio->tid, NULL);
100 list_del(&wio->wio_entry);
101 free(wio);
104 pthread_mutex_unlock(&pool->lock);
106 return err;
109 static struct dnet_work_pool *dnet_work_pool_alloc(struct dnet_node *n, int num, int mode, void *(* process)(void *))
111 struct dnet_work_pool *pool;
112 int err;
114 pool = malloc(sizeof(struct dnet_work_pool));
115 if (!pool) {
116 err = -ENOMEM;
117 goto err_out_exit;
120 memset(pool, 0, sizeof(struct dnet_work_pool));
122 pool->num = 0;
123 atomic_set(&pool->avail, 0);
124 pool->mode = mode;
125 pool->n = n;
126 INIT_LIST_HEAD(&pool->list);
127 INIT_LIST_HEAD(&pool->wio_list);
129 err = pthread_mutex_init(&pool->lock, NULL);
130 if (err) {
131 err = -err;
132 goto err_out_free;
135 err = pthread_cond_init(&pool->wait, NULL);
136 if (err) {
137 err = -err;
138 goto err_out_mutex_destroy;
141 err = dnet_work_pool_grow(n, pool, num, process);
142 if (err)
143 goto err_out_cond_destroy;
145 return pool;
147 err_out_cond_destroy:
148 pthread_cond_destroy(&pool->wait);
149 err_out_mutex_destroy:
150 pthread_mutex_destroy(&pool->lock);
151 err_out_free:
152 free(pool);
153 err_out_exit:
154 return NULL;
157 static void *dnet_io_process(void *data_);
158 static void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r)
160 struct dnet_io *io = n->io;
161 struct dnet_cmd *cmd = r->header;
162 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
163 struct dnet_work_pool *pool = io->recv_pool;
165 if (cmd->size > 0) {
166 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
167 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd),
168 (unsigned long long)cmd->size, nonblocking);
169 } else if ((cmd->size == 0) && !(cmd->flags & DNET_FLAGS_MORE) && (cmd->trans & DNET_TRANS_REPLY)) {
170 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
171 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking);
172 } else {
173 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
174 int reply = !!(cmd->trans & DNET_TRANS_REPLY);
176 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
177 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking,
178 (unsigned long long)cmd->size, (unsigned long long)cmd->flags, tid, reply);
182 if (nonblocking)
183 pool = io->recv_pool_nb;
185 #define cmd_is_exec_match(__cmd) (((__cmd)->cmd == DNET_CMD_EXEC) && ((__cmd)->size >= sizeof(struct sph)) && !((__cmd)->trans & DNET_TRANS_REPLY))
187 if (!list_empty(&pool->list) && cmd_is_exec_match(cmd)) {
188 int pool_has_blocked_sph = 0;
189 struct dnet_io_req *tmp;
190 struct sph *sph;
192 pthread_mutex_lock(&pool->lock);
193 list_for_each_entry(tmp, &pool->list, req_entry) {
194 struct dnet_cmd *tmp_cmd = tmp->header;
195 unsigned long long tid = tmp_cmd->trans & ~DNET_TRANS_REPLY;
196 int reply = !!(tmp_cmd->trans & DNET_TRANS_REPLY);
197 unsigned long long sph_flags = 0;
198 int sph_match = 0;
200 if (cmd_is_exec_match(tmp_cmd)) {
201 sph = (struct sph *)tmp->data;
202 sph_flags = sph->flags;
203 sph_match = 1;
207 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: pool-grow: %s: cmd-size: %llu, cflags: %llx, "
208 "trans: %lld, reply: %d, sph-flags: %llx (match: %d), avail: %d\n",
209 dnet_state_dump_addr(tmp->st), dnet_dump_id(tmp->header), dnet_cmd_string(tmp_cmd->cmd),
210 (unsigned long long)tmp_cmd->size, (unsigned long long)tmp_cmd->flags,
211 tid, reply, sph_flags, sph_match, atomic_read(&pool->avail));
213 if (cmd_is_exec_match(tmp_cmd)) {
214 sph = (struct sph *)tmp->data;
215 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
216 pool_has_blocked_sph = 1;
217 break;
221 pthread_mutex_unlock(&pool->lock);
223 sph = (struct sph *)r->data;
224 if ((sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) && pool_has_blocked_sph && (atomic_read(&pool->avail) == 0)) {
225 dnet_work_pool_grow(n, pool, pool->num/4+1, dnet_io_process);
229 pthread_mutex_lock(&pool->lock);
230 list_add_tail(&r->req_entry, &pool->list);
231 pthread_cond_broadcast(&pool->wait);
232 pthread_mutex_unlock(&pool->lock);
236 void dnet_schedule_command(struct dnet_net_state *st)
238 st->rcv_flags = DNET_IO_CMD;
240 if (st->rcv_data) {
241 #if 0
242 struct dnet_cmd *c = &st->rcv_cmd;
243 unsigned long long tid = c->trans & ~DNET_TRANS_REPLY;
244 dnet_log(st->n, DNET_LOG_DEBUG, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
245 (unsigned long long)c->size, tid, tid != c->trans, st->rcv_data);
246 #endif
247 free(st->rcv_data);
248 st->rcv_data = NULL;
251 st->rcv_end = sizeof(struct dnet_cmd);
252 st->rcv_offset = 0;
255 static int dnet_process_recv_single(struct dnet_net_state *st)
257 struct dnet_node *n = st->n;
258 struct dnet_io_req *r;
259 void *data;
260 uint64_t size;
261 int err;
263 again:
265 * Reading command first.
267 if (st->rcv_flags & DNET_IO_CMD)
268 data = &st->rcv_cmd;
269 else
270 data = st->rcv_data;
271 data += st->rcv_offset;
272 size = st->rcv_end - st->rcv_offset;
274 if (size) {
275 err = recv(st->read_s, data, size, 0);
276 if (err < 0) {
277 err = -EAGAIN;
278 if (errno != EAGAIN && errno != EINTR) {
279 err = -errno;
280 dnet_log_err(n, "failed to receive data, socket: %d", st->read_s);
281 goto out;
284 goto out;
287 if (err == 0) {
288 dnet_log(n, DNET_LOG_ERROR, "Peer %s has disconnected.\n",
289 dnet_server_convert_dnet_addr(&st->addr));
290 err = -ECONNRESET;
291 goto out;
294 st->rcv_offset += err;
297 if (st->rcv_offset != st->rcv_end)
298 goto again;
300 if (st->rcv_flags & DNET_IO_CMD) {
301 unsigned long long tid;
302 struct dnet_cmd *c = &st->rcv_cmd;
304 dnet_convert_cmd(c);
306 tid = c->trans & ~DNET_TRANS_REPLY;
308 dnet_log(n, DNET_LOG_DEBUG, "%s: received trans: %llu / %llx, "
309 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
310 dnet_dump_id(&c->id), tid, (unsigned long long)c->trans,
311 !!(c->trans & DNET_TRANS_REPLY),
312 (unsigned long long)c->size, (unsigned long long)c->flags, c->status);
314 r = malloc(c->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_req));
315 if (!r) {
316 err = -ENOMEM;
317 goto out;
319 memset(r, 0, sizeof(struct dnet_io_req));
321 r->header = r + 1;
322 r->hsize = sizeof(struct dnet_cmd);
323 memcpy(r->header, &st->rcv_cmd, sizeof(struct dnet_cmd));
325 st->rcv_data = r;
326 st->rcv_offset = sizeof(struct dnet_io_req) + sizeof(struct dnet_cmd);
327 st->rcv_end = st->rcv_offset + c->size;
328 st->rcv_flags &= ~DNET_IO_CMD;
330 if (c->size) {
331 r->data = r->header + sizeof(struct dnet_cmd);
332 r->dsize = c->size;
335 * We read the command header, now get the data.
337 goto again;
341 r = st->rcv_data;
342 st->rcv_data = NULL;
344 dnet_schedule_command(st);
346 r->st = dnet_state_get(st);
348 dnet_schedule_io(n, r);
349 return 0;
351 out:
352 if (err != -EAGAIN && err != -EINTR)
353 dnet_schedule_command(st);
355 return err;
358 int dnet_state_accept_process(struct dnet_net_state *orig, struct epoll_event *ev __unused)
360 struct dnet_node *n = orig->n;
361 int err, cs;
362 struct dnet_addr addr;
363 struct dnet_net_state *st;
365 memset(&addr, 0, sizeof(addr));
367 addr.addr_len = sizeof(addr.addr);
368 cs = accept(orig->read_s, (struct sockaddr *)&addr.addr, &addr.addr_len);
369 if (cs <= 0) {
370 err = -errno;
371 if (err != -EAGAIN)
372 dnet_log_err(n, "failed to accept new client at %s", dnet_state_dump_addr(orig));
373 goto err_out_exit;
376 dnet_set_sockopt(cs);
378 st = dnet_state_create(n, 0, NULL, 0, &addr, cs, &err, 0, dnet_state_net_process);
379 if (!st) {
380 dnet_log(n, DNET_LOG_ERROR, "%s: Failed to create state for accepted client: %s [%d]\n",
381 dnet_server_convert_dnet_addr(&addr), strerror(-err), -err);
382 err = -EAGAIN;
383 goto err_out_exit;
386 dnet_log(n, DNET_LOG_INFO, "Accepted client %s, socket: %d.\n",
387 dnet_server_convert_dnet_addr(&addr), cs);
389 return 0;
390 /* socket is closed in dnet_state_create() */
391 err_out_exit:
392 return err;
395 void dnet_unschedule_send(struct dnet_net_state *st)
397 struct epoll_event ev;
399 ev.events = EPOLLOUT;
400 ev.data.ptr = st;
402 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->write_s, &ev);
405 void dnet_unschedule_recv(struct dnet_net_state *st)
407 struct epoll_event ev;
409 ev.events = EPOLLIN;
410 ev.data.ptr = st;
412 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->read_s, &ev);
415 static int dnet_process_send_single(struct dnet_net_state *st)
417 struct dnet_io_req *r = NULL;
418 int err;
420 while (1) {
421 r = NULL;
423 pthread_mutex_lock(&st->send_lock);
424 if (!list_empty(&st->send_list)) {
425 r = list_first_entry(&st->send_list, struct dnet_io_req, req_entry);
426 } else {
427 dnet_unschedule_send(st);
429 pthread_mutex_unlock(&st->send_lock);
431 if (!r) {
432 err = -EAGAIN;
433 goto err_out_exit;
436 err = dnet_send_request(st, r);
437 if (err)
438 goto err_out_exit;
441 err_out_exit:
442 return err;
445 static int dnet_schedule_network_io(struct dnet_net_state *st, int send)
447 struct epoll_event ev;
448 int err, fd;
450 if (send) {
451 ev.events = EPOLLOUT;
452 fd = st->write_s;
453 } else {
454 ev.events = EPOLLIN;
455 fd = st->read_s;
457 ev.data.ptr = st;
459 err = epoll_ctl(st->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
460 if (err < 0) {
461 err = -errno;
463 if (err == -EEXIST) {
464 err = 0;
465 } else {
466 dnet_log_err(st->n, "%s: failed to add %s event", dnet_state_dump_addr(st), send ? "SEND" : "RECV");
470 return err;
473 int dnet_schedule_send(struct dnet_net_state *st)
475 return dnet_schedule_network_io(st, 1);
478 int dnet_schedule_recv(struct dnet_net_state *st)
480 return dnet_schedule_network_io(st, 0);
483 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev)
485 int err = -ECONNRESET;
487 if (ev->events & EPOLLIN) {
488 err = dnet_process_recv_single(st);
489 if (err && (err != -EAGAIN))
490 goto err_out_exit;
492 if (ev->events & EPOLLOUT) {
493 err = dnet_process_send_single(st);
494 if (err && (err != -EAGAIN))
495 goto err_out_exit;
498 if (ev->events & (EPOLLHUP | EPOLLERR)) {
499 dnet_log(st->n, DNET_LOG_ERROR, "%s: received error event mask %x\n", dnet_state_dump_addr(st), ev->events);
500 err = -ECONNRESET;
502 err_out_exit:
503 return err;
506 static void *dnet_io_process_network(void *data_)
508 struct dnet_net_io *nio = data_;
509 struct dnet_node *n = nio->n;
510 struct dnet_net_state *st;
511 struct epoll_event ev;
512 int err = 0, check;
513 struct dnet_trans *t, *tmp;
514 struct timeval tv;
515 struct list_head head;
517 dnet_set_name("net_pool");
519 while (!n->need_exit) {
520 err = epoll_wait(nio->epoll_fd, &ev, 1, 1000);
521 if (err == 0)
522 continue;
524 if (err < 0) {
525 err = -errno;
527 if (err == -EAGAIN || err == -EINTR)
528 continue;
530 dnet_log_err(n, "Failed to wait for IO fds");
531 n->need_exit = err;
532 break;
535 st = ev.data.ptr;
536 st->epoll_fd = nio->epoll_fd;
537 check = st->stall;
539 while (1) {
540 err = st->process(st, &ev);
541 if (err == 0)
542 continue;
544 if (err == -EAGAIN && st->stall < DNET_DEFAULT_STALL_TRANSACTIONS)
545 break;
547 if (err < 0 || st->stall >= DNET_DEFAULT_STALL_TRANSACTIONS) {
548 dnet_state_reset(st);
549 check = 0;
550 break;
554 if (!check)
555 continue;
557 gettimeofday(&tv, NULL);
559 INIT_LIST_HEAD(&head);
561 pthread_mutex_lock(&st->trans_lock);
562 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_list_entry) {
563 if (t->time.tv_sec >= tv.tv_sec)
564 break;
566 dnet_trans_remove_nolock(&st->trans_root, t);
567 list_move(&t->trans_list_entry, &head);
569 pthread_mutex_unlock(&st->trans_lock);
571 list_for_each_entry_safe(t, tmp, &head, trans_list_entry) {
572 list_del_init(&t->trans_list_entry);
574 t->cmd.flags = 0;
575 t->cmd.size = 0;
576 t->cmd.status = -ETIMEDOUT;
578 dnet_log(st->n, DNET_LOG_ERROR, "%s: destructing trans: %llu on TIMEOUT\n",
579 dnet_state_dump_addr(st), (unsigned long long)t->trans);
581 if (t->complete)
582 t->complete(st, &t->cmd, t->priv);
584 dnet_trans_put(t);
588 return &n->need_exit;
591 static void dnet_io_cleanup_states(struct dnet_node *n)
593 struct dnet_net_state *st, *tmp;
595 list_for_each_entry_safe(st, tmp, &n->storage_state_list, storage_state_entry) {
596 dnet_state_reset(st);
600 struct dnet_io_process_data {
601 struct dnet_node *n;
602 int thread_number;
605 static void *dnet_io_process(void *data_)
607 struct dnet_work_io *wio = data_;
608 struct dnet_work_pool *pool = wio->pool;
609 struct dnet_node *n = pool->n;
610 struct dnet_net_state *st;
611 struct timespec ts;
612 struct timeval tv;
613 struct dnet_io_req *r;
614 int err;
616 dnet_set_name("io_pool");
618 while (!n->need_exit) {
619 r = NULL;
620 err = 0;
622 gettimeofday(&tv, NULL);
623 ts.tv_sec = tv.tv_sec + 1;
624 ts.tv_nsec = tv.tv_usec * 1000;
626 pthread_mutex_lock(&pool->lock);
628 if (!list_empty(&pool->list)) {
629 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
630 } else {
631 err = pthread_cond_timedwait(&pool->wait, &pool->lock, &ts);
632 if (!list_empty(&pool->list)) {
633 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
634 err = 0;
638 if (r) {
639 list_del_init(&r->req_entry);
640 atomic_dec(&pool->avail);
642 pthread_mutex_unlock(&pool->lock);
644 if (!r || err)
645 continue;
647 st = r->st;
649 dnet_log(n, DNET_LOG_DEBUG, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
650 dnet_state_dump_addr(st), dnet_dump_id(r->header), r, r->hsize, r->dsize, dnet_work_io_mode_str(pool->mode));
652 err = dnet_process_recv(st, r);
654 dnet_io_req_free(r);
655 dnet_state_put(st);
657 atomic_inc(&pool->avail);
660 return NULL;
663 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg)
665 int err, i;
666 struct dnet_io *io;
667 int io_size = sizeof(struct dnet_io) + sizeof(struct dnet_net_io) * cfg->net_thread_num;
669 io = malloc(io_size);
670 if (!io) {
671 err = -ENOMEM;
672 goto err_out_exit;
675 memset(io, 0, io_size);
677 io->net_thread_num = cfg->net_thread_num;
678 io->net_thread_pos = 0;
679 io->net = (struct dnet_net_io *)(io + 1);
681 io->recv_pool = dnet_work_pool_alloc(n, cfg->io_thread_num, DNET_WORK_IO_MODE_BLOCKING, dnet_io_process);
682 if (!io->recv_pool) {
683 err = -ENOMEM;
684 goto err_out_free;
687 io->recv_pool_nb = dnet_work_pool_alloc(n, cfg->nonblocking_io_thread_num, DNET_WORK_IO_MODE_NONBLOCKING, dnet_io_process);
688 if (!io->recv_pool_nb) {
689 err = -ENOMEM;
690 goto err_out_free_recv_pool;
693 for (i=0; i<io->net_thread_num; ++i) {
694 struct dnet_net_io *nio = &io->net[i];
696 nio->n = n;
698 nio->epoll_fd = epoll_create(10000);
699 if (nio->epoll_fd < 0) {
700 err = -errno;
701 dnet_log_err(n, "Failed to create epoll fd");
702 goto err_out_net_destroy;
705 fcntl(nio->epoll_fd, F_SETFD, FD_CLOEXEC);
706 fcntl(nio->epoll_fd, F_SETFL, O_NONBLOCK);
708 err = pthread_create(&nio->tid, NULL, dnet_io_process_network, nio);
709 if (err) {
710 close(nio->epoll_fd);
711 err = -err;
712 dnet_log(n, DNET_LOG_ERROR, "Failed to create network processing thread: %d\n", err);
713 goto err_out_net_destroy;
717 n->io = io;
718 return 0;
720 err_out_net_destroy:
721 while (--i >= 0) {
722 pthread_join(io->net[i].tid, NULL);
723 close(io->net[i].epoll_fd);
726 dnet_work_pool_cleanup(io->recv_pool_nb);
727 err_out_free_recv_pool:
728 dnet_work_pool_cleanup(io->recv_pool);
729 err_out_free:
730 free(io);
731 err_out_exit:
732 n->io = NULL;
733 return err;
736 void dnet_io_exit(struct dnet_node *n)
738 struct dnet_io *io = n->io;
739 int i;
741 n->need_exit = 1;
743 for (i=0; i<io->net_thread_num; ++i) {
744 pthread_join(io->net[i].tid, NULL);
745 close(io->net[i].epoll_fd);
748 dnet_work_pool_cleanup(io->recv_pool_nb);
749 dnet_work_pool_cleanup(io->recv_pool);
751 dnet_io_cleanup_states(n);
753 free(io);