Tune nonblocking pool growing policy
[elliptics.git] / library / pool.c
blobfc26a944b9b74f58e60a44bcd14196357eefaca5
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string[] = {
28 [DNET_WORK_IO_MODE_BLOCKING] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode)
34 if (mode < 0 || mode >= (int)ARRAY_SIZE(dnet_work_io_mode_string))
35 return NULL;
37 return dnet_work_io_mode_string[mode];
40 static void dnet_work_pool_cleanup(struct dnet_work_pool *pool)
42 struct dnet_io_req *r, *tmp;
43 struct dnet_work_io *wio, *wio_tmp;
45 list_for_each_entry_safe(wio, wio_tmp, &pool->wio_list, wio_entry) {
46 pthread_join(wio->tid, NULL);
47 list_del(&wio->wio_entry);
48 free(wio);
52 list_for_each_entry_safe(r, tmp, &pool->list, req_entry) {
53 list_del(&r->req_entry);
54 dnet_io_req_free(r);
57 pthread_cond_destroy(&pool->wait);
58 pthread_mutex_destroy(&pool->lock);
59 free(pool);
62 static int dnet_work_pool_grow(struct dnet_node *n, struct dnet_work_pool *pool, int num, void *(* process)(void *))
64 int i, err;
65 struct dnet_work_io *wio, *tmp;
67 pthread_mutex_lock(&pool->lock);
69 for (i = 0; i < num; ++i) {
70 wio = malloc(sizeof(struct dnet_work_io));
71 if (!wio) {
72 err = -ENOMEM;
73 goto err_out_io_threads;
76 wio->thread_index = i;
77 wio->pool = pool;
78 list_add_tail(&wio->wio_entry, &pool->wio_list);
80 err = pthread_create(&wio->tid, NULL, process, wio);
81 if (err) {
82 err = -err;
83 dnet_log(n, DNET_LOG_ERROR, "Failed to create IO thread: %d\n", err);
84 goto err_out_io_threads;
88 dnet_log(n, DNET_LOG_INFO, "Grew %s pool by: %d -> %d IO threads\n",
89 dnet_work_io_mode_str(pool->mode), pool->num, pool->num + num);
91 atomic_add(&pool->avail, num);
92 pool->num += num;
93 pthread_mutex_unlock(&pool->lock);
95 return 0;
97 err_out_io_threads:
98 list_for_each_entry_safe(wio, tmp, &pool->wio_list, wio_entry) {
99 pthread_join(wio->tid, NULL);
100 list_del(&wio->wio_entry);
101 free(wio);
104 pthread_mutex_unlock(&pool->lock);
106 return err;
109 static struct dnet_work_pool *dnet_work_pool_alloc(struct dnet_node *n, int num, int mode, void *(* process)(void *))
111 struct dnet_work_pool *pool;
112 int err;
114 pool = malloc(sizeof(struct dnet_work_pool));
115 if (!pool) {
116 err = -ENOMEM;
117 goto err_out_exit;
120 memset(pool, 0, sizeof(struct dnet_work_pool));
122 pool->num = 0;
123 atomic_set(&pool->avail, 0);
124 pool->mode = mode;
125 pool->n = n;
126 INIT_LIST_HEAD(&pool->list);
127 INIT_LIST_HEAD(&pool->wio_list);
129 err = pthread_mutex_init(&pool->lock, NULL);
130 if (err) {
131 err = -err;
132 goto err_out_free;
135 err = pthread_cond_init(&pool->wait, NULL);
136 if (err) {
137 err = -err;
138 goto err_out_mutex_destroy;
141 err = dnet_work_pool_grow(n, pool, num, process);
142 if (err)
143 goto err_out_cond_destroy;
145 return pool;
147 err_out_cond_destroy:
148 pthread_cond_destroy(&pool->wait);
149 err_out_mutex_destroy:
150 pthread_mutex_destroy(&pool->lock);
151 err_out_free:
152 free(pool);
153 err_out_exit:
154 return NULL;
157 static void *dnet_io_process(void *data_);
158 static void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r)
160 struct dnet_io *io = n->io;
161 struct dnet_cmd *cmd = r->header;
162 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
163 struct dnet_work_pool *pool = io->recv_pool;
165 if (cmd->size > 0) {
166 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
167 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd),
168 (unsigned long long)cmd->size, nonblocking);
169 } else if ((cmd->size == 0) && !(cmd->flags & DNET_FLAGS_MORE) && (cmd->trans & DNET_TRANS_REPLY)) {
170 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
171 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking);
172 } else {
173 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
174 int reply = !!(cmd->trans & DNET_TRANS_REPLY);
176 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
177 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking,
178 (unsigned long long)cmd->size, (unsigned long long)cmd->flags, tid, reply);
182 if (nonblocking)
183 pool = io->recv_pool_nb;
185 #define cmd_is_exec_match(__cmd) (((__cmd)->cmd == DNET_CMD_EXEC) && ((__cmd)->size >= sizeof(struct sph)) && !((__cmd)->trans & DNET_TRANS_REPLY))
187 if (!list_empty(&pool->list) && cmd_is_exec_match(cmd)) {
188 int pool_has_blocked_sph = 0;
189 struct dnet_io_req *tmp;
190 struct sph *sph;
191 int edge_num = pool->num / 4 + 1;
193 pthread_mutex_lock(&pool->lock);
194 list_for_each_entry(tmp, &pool->list, req_entry) {
195 struct dnet_cmd *tmp_cmd = tmp->header;
196 unsigned long long tid = tmp_cmd->trans & ~DNET_TRANS_REPLY;
197 int reply = !!(tmp_cmd->trans & DNET_TRANS_REPLY);
198 unsigned long long sph_flags = 0;
199 int sph_match = 0;
201 if (cmd_is_exec_match(tmp_cmd)) {
202 sph = (struct sph *)tmp->data;
203 sph_flags = sph->flags;
204 sph_match = 1;
208 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: pool-grow: %s: cmd-size: %llu, cflags: %llx, "
209 "trans: %lld, reply: %d, sph-flags: %llx (match: %d), avail: %d\n",
210 dnet_state_dump_addr(tmp->st), dnet_dump_id(tmp->header), dnet_cmd_string(tmp_cmd->cmd),
211 (unsigned long long)tmp_cmd->size, (unsigned long long)tmp_cmd->flags,
212 tid, reply, sph_flags, sph_match, atomic_read(&pool->avail));
214 if (cmd_is_exec_match(tmp_cmd)) {
215 sph = (struct sph *)tmp->data;
216 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
217 pool_has_blocked_sph = 1;
218 break;
222 pthread_mutex_unlock(&pool->lock);
224 sph = (struct sph *)r->data;
225 if ((sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) && pool_has_blocked_sph && (atomic_read(&pool->avail) < edge_num)) {
226 dnet_work_pool_grow(n, pool, edge_num, dnet_io_process);
230 pthread_mutex_lock(&pool->lock);
231 list_add_tail(&r->req_entry, &pool->list);
232 pthread_cond_broadcast(&pool->wait);
233 pthread_mutex_unlock(&pool->lock);
237 void dnet_schedule_command(struct dnet_net_state *st)
239 st->rcv_flags = DNET_IO_CMD;
241 if (st->rcv_data) {
242 #if 0
243 struct dnet_cmd *c = &st->rcv_cmd;
244 unsigned long long tid = c->trans & ~DNET_TRANS_REPLY;
245 dnet_log(st->n, DNET_LOG_DEBUG, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
246 (unsigned long long)c->size, tid, tid != c->trans, st->rcv_data);
247 #endif
248 free(st->rcv_data);
249 st->rcv_data = NULL;
252 st->rcv_end = sizeof(struct dnet_cmd);
253 st->rcv_offset = 0;
256 static int dnet_process_recv_single(struct dnet_net_state *st)
258 struct dnet_node *n = st->n;
259 struct dnet_io_req *r;
260 void *data;
261 uint64_t size;
262 int err;
264 again:
266 * Reading command first.
268 if (st->rcv_flags & DNET_IO_CMD)
269 data = &st->rcv_cmd;
270 else
271 data = st->rcv_data;
272 data += st->rcv_offset;
273 size = st->rcv_end - st->rcv_offset;
275 if (size) {
276 err = recv(st->read_s, data, size, 0);
277 if (err < 0) {
278 err = -EAGAIN;
279 if (errno != EAGAIN && errno != EINTR) {
280 err = -errno;
281 dnet_log_err(n, "failed to receive data, socket: %d", st->read_s);
282 goto out;
285 goto out;
288 if (err == 0) {
289 dnet_log(n, DNET_LOG_ERROR, "Peer %s has disconnected.\n",
290 dnet_server_convert_dnet_addr(&st->addr));
291 err = -ECONNRESET;
292 goto out;
295 st->rcv_offset += err;
298 if (st->rcv_offset != st->rcv_end)
299 goto again;
301 if (st->rcv_flags & DNET_IO_CMD) {
302 unsigned long long tid;
303 struct dnet_cmd *c = &st->rcv_cmd;
305 dnet_convert_cmd(c);
307 tid = c->trans & ~DNET_TRANS_REPLY;
309 dnet_log(n, DNET_LOG_DEBUG, "%s: received trans: %llu / %llx, "
310 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
311 dnet_dump_id(&c->id), tid, (unsigned long long)c->trans,
312 !!(c->trans & DNET_TRANS_REPLY),
313 (unsigned long long)c->size, (unsigned long long)c->flags, c->status);
315 r = malloc(c->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_req));
316 if (!r) {
317 err = -ENOMEM;
318 goto out;
320 memset(r, 0, sizeof(struct dnet_io_req));
322 r->header = r + 1;
323 r->hsize = sizeof(struct dnet_cmd);
324 memcpy(r->header, &st->rcv_cmd, sizeof(struct dnet_cmd));
326 st->rcv_data = r;
327 st->rcv_offset = sizeof(struct dnet_io_req) + sizeof(struct dnet_cmd);
328 st->rcv_end = st->rcv_offset + c->size;
329 st->rcv_flags &= ~DNET_IO_CMD;
331 if (c->size) {
332 r->data = r->header + sizeof(struct dnet_cmd);
333 r->dsize = c->size;
336 * We read the command header, now get the data.
338 goto again;
342 r = st->rcv_data;
343 st->rcv_data = NULL;
345 dnet_schedule_command(st);
347 r->st = dnet_state_get(st);
349 dnet_schedule_io(n, r);
350 return 0;
352 out:
353 if (err != -EAGAIN && err != -EINTR)
354 dnet_schedule_command(st);
356 return err;
359 int dnet_state_accept_process(struct dnet_net_state *orig, struct epoll_event *ev __unused)
361 struct dnet_node *n = orig->n;
362 int err, cs;
363 struct dnet_addr addr;
364 struct dnet_net_state *st;
366 memset(&addr, 0, sizeof(addr));
368 addr.addr_len = sizeof(addr.addr);
369 cs = accept(orig->read_s, (struct sockaddr *)&addr.addr, &addr.addr_len);
370 if (cs <= 0) {
371 err = -errno;
372 if (err != -EAGAIN)
373 dnet_log_err(n, "failed to accept new client at %s", dnet_state_dump_addr(orig));
374 goto err_out_exit;
377 dnet_set_sockopt(cs);
379 st = dnet_state_create(n, 0, NULL, 0, &addr, cs, &err, 0, dnet_state_net_process);
380 if (!st) {
381 dnet_log(n, DNET_LOG_ERROR, "%s: Failed to create state for accepted client: %s [%d]\n",
382 dnet_server_convert_dnet_addr(&addr), strerror(-err), -err);
383 err = -EAGAIN;
384 goto err_out_exit;
387 dnet_log(n, DNET_LOG_INFO, "Accepted client %s, socket: %d.\n",
388 dnet_server_convert_dnet_addr(&addr), cs);
390 return 0;
391 /* socket is closed in dnet_state_create() */
392 err_out_exit:
393 return err;
396 void dnet_unschedule_send(struct dnet_net_state *st)
398 struct epoll_event ev;
400 ev.events = EPOLLOUT;
401 ev.data.ptr = st;
403 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->write_s, &ev);
406 void dnet_unschedule_recv(struct dnet_net_state *st)
408 struct epoll_event ev;
410 ev.events = EPOLLIN;
411 ev.data.ptr = st;
413 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->read_s, &ev);
416 static int dnet_process_send_single(struct dnet_net_state *st)
418 struct dnet_io_req *r = NULL;
419 int err;
421 while (1) {
422 r = NULL;
424 pthread_mutex_lock(&st->send_lock);
425 if (!list_empty(&st->send_list)) {
426 r = list_first_entry(&st->send_list, struct dnet_io_req, req_entry);
427 } else {
428 dnet_unschedule_send(st);
430 pthread_mutex_unlock(&st->send_lock);
432 if (!r) {
433 err = -EAGAIN;
434 goto err_out_exit;
437 err = dnet_send_request(st, r);
438 if (err)
439 goto err_out_exit;
442 err_out_exit:
443 return err;
446 static int dnet_schedule_network_io(struct dnet_net_state *st, int send)
448 struct epoll_event ev;
449 int err, fd;
451 if (send) {
452 ev.events = EPOLLOUT;
453 fd = st->write_s;
454 } else {
455 ev.events = EPOLLIN;
456 fd = st->read_s;
458 ev.data.ptr = st;
460 err = epoll_ctl(st->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
461 if (err < 0) {
462 err = -errno;
464 if (err == -EEXIST) {
465 err = 0;
466 } else {
467 dnet_log_err(st->n, "%s: failed to add %s event", dnet_state_dump_addr(st), send ? "SEND" : "RECV");
471 return err;
474 int dnet_schedule_send(struct dnet_net_state *st)
476 return dnet_schedule_network_io(st, 1);
479 int dnet_schedule_recv(struct dnet_net_state *st)
481 return dnet_schedule_network_io(st, 0);
484 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev)
486 int err = -ECONNRESET;
488 if (ev->events & EPOLLIN) {
489 err = dnet_process_recv_single(st);
490 if (err && (err != -EAGAIN))
491 goto err_out_exit;
493 if (ev->events & EPOLLOUT) {
494 err = dnet_process_send_single(st);
495 if (err && (err != -EAGAIN))
496 goto err_out_exit;
499 if (ev->events & (EPOLLHUP | EPOLLERR)) {
500 dnet_log(st->n, DNET_LOG_ERROR, "%s: received error event mask %x\n", dnet_state_dump_addr(st), ev->events);
501 err = -ECONNRESET;
503 err_out_exit:
504 return err;
507 static void *dnet_io_process_network(void *data_)
509 struct dnet_net_io *nio = data_;
510 struct dnet_node *n = nio->n;
511 struct dnet_net_state *st;
512 struct epoll_event ev;
513 int err = 0, check;
514 struct dnet_trans *t, *tmp;
515 struct timeval tv;
516 struct list_head head;
518 dnet_set_name("net_pool");
520 while (!n->need_exit) {
521 err = epoll_wait(nio->epoll_fd, &ev, 1, 1000);
522 if (err == 0)
523 continue;
525 if (err < 0) {
526 err = -errno;
528 if (err == -EAGAIN || err == -EINTR)
529 continue;
531 dnet_log_err(n, "Failed to wait for IO fds");
532 n->need_exit = err;
533 break;
536 st = ev.data.ptr;
537 st->epoll_fd = nio->epoll_fd;
538 check = st->stall;
540 while (1) {
541 err = st->process(st, &ev);
542 if (err == 0)
543 continue;
545 if (err == -EAGAIN && st->stall < DNET_DEFAULT_STALL_TRANSACTIONS)
546 break;
548 if (err < 0 || st->stall >= DNET_DEFAULT_STALL_TRANSACTIONS) {
549 dnet_state_reset(st);
550 check = 0;
551 break;
555 if (!check)
556 continue;
558 gettimeofday(&tv, NULL);
560 INIT_LIST_HEAD(&head);
562 pthread_mutex_lock(&st->trans_lock);
563 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_list_entry) {
564 if (t->time.tv_sec >= tv.tv_sec)
565 break;
567 dnet_trans_remove_nolock(&st->trans_root, t);
568 list_move(&t->trans_list_entry, &head);
570 pthread_mutex_unlock(&st->trans_lock);
572 list_for_each_entry_safe(t, tmp, &head, trans_list_entry) {
573 list_del_init(&t->trans_list_entry);
575 t->cmd.flags = 0;
576 t->cmd.size = 0;
577 t->cmd.status = -ETIMEDOUT;
579 dnet_log(st->n, DNET_LOG_ERROR, "%s: destructing trans: %llu on TIMEOUT\n",
580 dnet_state_dump_addr(st), (unsigned long long)t->trans);
582 if (t->complete)
583 t->complete(st, &t->cmd, t->priv);
585 dnet_trans_put(t);
589 return &n->need_exit;
592 static void dnet_io_cleanup_states(struct dnet_node *n)
594 struct dnet_net_state *st, *tmp;
596 list_for_each_entry_safe(st, tmp, &n->storage_state_list, storage_state_entry) {
597 dnet_state_reset(st);
601 struct dnet_io_process_data {
602 struct dnet_node *n;
603 int thread_number;
606 static void *dnet_io_process(void *data_)
608 struct dnet_work_io *wio = data_;
609 struct dnet_work_pool *pool = wio->pool;
610 struct dnet_node *n = pool->n;
611 struct dnet_net_state *st;
612 struct timespec ts;
613 struct timeval tv;
614 struct dnet_io_req *r;
615 int err;
617 dnet_set_name("io_pool");
619 while (!n->need_exit) {
620 r = NULL;
621 err = 0;
623 gettimeofday(&tv, NULL);
624 ts.tv_sec = tv.tv_sec + 1;
625 ts.tv_nsec = tv.tv_usec * 1000;
627 pthread_mutex_lock(&pool->lock);
629 if (!list_empty(&pool->list)) {
630 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
631 } else {
632 err = pthread_cond_timedwait(&pool->wait, &pool->lock, &ts);
633 if (!list_empty(&pool->list)) {
634 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
635 err = 0;
639 if (r) {
640 list_del_init(&r->req_entry);
641 atomic_dec(&pool->avail);
643 pthread_mutex_unlock(&pool->lock);
645 if (!r || err)
646 continue;
648 st = r->st;
650 dnet_log(n, DNET_LOG_DEBUG, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
651 dnet_state_dump_addr(st), dnet_dump_id(r->header), r, r->hsize, r->dsize, dnet_work_io_mode_str(pool->mode));
653 err = dnet_process_recv(st, r);
655 dnet_io_req_free(r);
656 dnet_state_put(st);
658 atomic_inc(&pool->avail);
661 return NULL;
664 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg)
666 int err, i;
667 struct dnet_io *io;
668 int io_size = sizeof(struct dnet_io) + sizeof(struct dnet_net_io) * cfg->net_thread_num;
670 io = malloc(io_size);
671 if (!io) {
672 err = -ENOMEM;
673 goto err_out_exit;
676 memset(io, 0, io_size);
678 io->net_thread_num = cfg->net_thread_num;
679 io->net_thread_pos = 0;
680 io->net = (struct dnet_net_io *)(io + 1);
682 io->recv_pool = dnet_work_pool_alloc(n, cfg->io_thread_num, DNET_WORK_IO_MODE_BLOCKING, dnet_io_process);
683 if (!io->recv_pool) {
684 err = -ENOMEM;
685 goto err_out_free;
688 io->recv_pool_nb = dnet_work_pool_alloc(n, cfg->nonblocking_io_thread_num, DNET_WORK_IO_MODE_NONBLOCKING, dnet_io_process);
689 if (!io->recv_pool_nb) {
690 err = -ENOMEM;
691 goto err_out_free_recv_pool;
694 for (i=0; i<io->net_thread_num; ++i) {
695 struct dnet_net_io *nio = &io->net[i];
697 nio->n = n;
699 nio->epoll_fd = epoll_create(10000);
700 if (nio->epoll_fd < 0) {
701 err = -errno;
702 dnet_log_err(n, "Failed to create epoll fd");
703 goto err_out_net_destroy;
706 fcntl(nio->epoll_fd, F_SETFD, FD_CLOEXEC);
707 fcntl(nio->epoll_fd, F_SETFL, O_NONBLOCK);
709 err = pthread_create(&nio->tid, NULL, dnet_io_process_network, nio);
710 if (err) {
711 close(nio->epoll_fd);
712 err = -err;
713 dnet_log(n, DNET_LOG_ERROR, "Failed to create network processing thread: %d\n", err);
714 goto err_out_net_destroy;
718 n->io = io;
719 return 0;
721 err_out_net_destroy:
722 while (--i >= 0) {
723 pthread_join(io->net[i].tid, NULL);
724 close(io->net[i].epoll_fd);
727 dnet_work_pool_cleanup(io->recv_pool_nb);
728 err_out_free_recv_pool:
729 dnet_work_pool_cleanup(io->recv_pool);
730 err_out_free:
731 free(io);
732 err_out_exit:
733 n->io = NULL;
734 return err;
737 void dnet_io_exit(struct dnet_node *n)
739 struct dnet_io *io = n->io;
740 int i;
742 n->need_exit = 1;
744 for (i=0; i<io->net_thread_num; ++i) {
745 pthread_join(io->net[i].tid, NULL);
746 close(io->net[i].epoll_fd);
749 dnet_work_pool_cleanup(io->recv_pool_nb);
750 dnet_work_pool_cleanup(io->recv_pool);
752 dnet_io_cleanup_states(n);
754 free(io);