Only work with 0.9.4 cocaine
[elliptics.git] / library / pool.c
blob1a99fa69224b7ff63908550fdf3035817edcfc62
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string[] = {
28 [DNET_WORK_IO_MODE_BLOCKING] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode)
34 if (mode < 0 || mode >= (int)ARRAY_SIZE(dnet_work_io_mode_string))
35 return NULL;
37 return dnet_work_io_mode_string[mode];
40 static void dnet_work_pool_cleanup(struct dnet_work_pool *pool)
42 struct dnet_io_req *r, *tmp;
43 struct dnet_work_io *wio, *wio_tmp;
45 list_for_each_entry_safe(wio, wio_tmp, &pool->wio_list, wio_entry) {
46 pthread_join(wio->tid, NULL);
47 list_del(&wio->wio_entry);
48 free(wio);
52 list_for_each_entry_safe(r, tmp, &pool->list, req_entry) {
53 list_del(&r->req_entry);
54 dnet_io_req_free(r);
57 pthread_cond_destroy(&pool->wait);
58 pthread_mutex_destroy(&pool->lock);
59 free(pool);
62 static int dnet_work_pool_grow(struct dnet_node *n, struct dnet_work_pool *pool, int num, void *(* process)(void *))
64 int i, err;
65 struct dnet_work_io *wio, *tmp;
67 pthread_mutex_lock(&pool->lock);
69 for (i = 0; i < num; ++i) {
70 wio = malloc(sizeof(struct dnet_work_io));
71 if (!wio) {
72 err = -ENOMEM;
73 goto err_out_io_threads;
76 wio->thread_index = i;
77 wio->pool = pool;
78 list_add_tail(&wio->wio_entry, &pool->wio_list);
80 err = pthread_create(&wio->tid, NULL, process, wio);
81 if (err) {
82 err = -err;
83 dnet_log(n, DNET_LOG_ERROR, "Failed to create IO thread: %d\n", err);
84 goto err_out_io_threads;
88 dnet_log(n, DNET_LOG_INFO, "Grew %s pool by: %d -> %d IO threads\n",
89 dnet_work_io_mode_str(pool->mode), pool->num, pool->num + num);
91 pool->num += num;
92 pthread_mutex_unlock(&pool->lock);
94 return 0;
96 err_out_io_threads:
97 list_for_each_entry_safe(wio, tmp, &pool->wio_list, wio_entry) {
98 pthread_join(wio->tid, NULL);
99 list_del(&wio->wio_entry);
100 free(wio);
103 pthread_mutex_unlock(&pool->lock);
105 return err;
108 static struct dnet_work_pool *dnet_work_pool_alloc(struct dnet_node *n, int num, int mode, void *(* process)(void *))
110 struct dnet_work_pool *pool;
111 int err;
113 pool = malloc(sizeof(struct dnet_work_pool));
114 if (!pool) {
115 err = -ENOMEM;
116 goto err_out_exit;
119 memset(pool, 0, sizeof(struct dnet_work_pool));
121 pool->num = 0;
122 pool->mode = mode;
123 pool->n = n;
124 INIT_LIST_HEAD(&pool->list);
125 INIT_LIST_HEAD(&pool->wio_list);
127 err = pthread_mutex_init(&pool->lock, NULL);
128 if (err) {
129 err = -err;
130 goto err_out_free;
133 err = pthread_cond_init(&pool->wait, NULL);
134 if (err) {
135 err = -err;
136 goto err_out_mutex_destroy;
139 err = dnet_work_pool_grow(n, pool, num, process);
140 if (err)
141 goto err_out_cond_destroy;
143 return pool;
145 err_out_cond_destroy:
146 pthread_cond_destroy(&pool->wait);
147 err_out_mutex_destroy:
148 pthread_mutex_destroy(&pool->lock);
149 err_out_free:
150 free(pool);
151 err_out_exit:
152 return NULL;
155 static void *dnet_io_process(void *data_);
156 static void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r)
158 struct dnet_io *io = n->io;
159 struct dnet_cmd *cmd = r->header;
160 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
161 struct dnet_work_pool *pool = io->recv_pool;
163 if (cmd->size > 0) {
164 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
165 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd),
166 (unsigned long long)cmd->size, nonblocking);
167 } else if ((cmd->size == 0) && !(cmd->flags & DNET_FLAGS_MORE) && (cmd->trans & DNET_TRANS_REPLY)) {
168 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
169 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking);
170 } else {
171 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
172 int reply = !!(cmd->trans & DNET_TRANS_REPLY);
174 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
175 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking,
176 (unsigned long long)cmd->size, (unsigned long long)cmd->flags, tid, reply);
180 if (nonblocking)
181 pool = io->recv_pool_nb;
183 if (list_empty(&pool->list) && (cmd->cmd == DNET_CMD_EXEC) && (cmd->size >= sizeof(struct sph))) {
184 struct sph *sph = (struct sph *)r->data;
185 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
186 dnet_work_pool_grow(n, pool, pool->num/4+1, dnet_io_process);
190 pthread_mutex_lock(&pool->lock);
191 list_add_tail(&r->req_entry, &pool->list);
192 pthread_cond_broadcast(&pool->wait);
193 pthread_mutex_unlock(&pool->lock);
197 void dnet_schedule_command(struct dnet_net_state *st)
199 st->rcv_flags = DNET_IO_CMD;
201 if (st->rcv_data) {
202 #if 0
203 struct dnet_cmd *c = &st->rcv_cmd;
204 unsigned long long tid = c->trans & ~DNET_TRANS_REPLY;
205 dnet_log(st->n, DNET_LOG_DEBUG, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
206 (unsigned long long)c->size, tid, tid != c->trans, st->rcv_data);
207 #endif
208 free(st->rcv_data);
209 st->rcv_data = NULL;
212 st->rcv_end = sizeof(struct dnet_cmd);
213 st->rcv_offset = 0;
216 static int dnet_process_recv_single(struct dnet_net_state *st)
218 struct dnet_node *n = st->n;
219 struct dnet_io_req *r;
220 void *data;
221 uint64_t size;
222 int err;
224 again:
226 * Reading command first.
228 if (st->rcv_flags & DNET_IO_CMD)
229 data = &st->rcv_cmd;
230 else
231 data = st->rcv_data;
232 data += st->rcv_offset;
233 size = st->rcv_end - st->rcv_offset;
235 if (size) {
236 err = recv(st->read_s, data, size, 0);
237 if (err < 0) {
238 err = -EAGAIN;
239 if (errno != EAGAIN && errno != EINTR) {
240 err = -errno;
241 dnet_log_err(n, "failed to receive data, socket: %d", st->read_s);
242 goto out;
245 goto out;
248 if (err == 0) {
249 dnet_log(n, DNET_LOG_ERROR, "Peer %s has disconnected.\n",
250 dnet_server_convert_dnet_addr(&st->addr));
251 err = -ECONNRESET;
252 goto out;
255 st->rcv_offset += err;
258 if (st->rcv_offset != st->rcv_end)
259 goto again;
261 if (st->rcv_flags & DNET_IO_CMD) {
262 unsigned long long tid;
263 struct dnet_cmd *c = &st->rcv_cmd;
265 dnet_convert_cmd(c);
267 tid = c->trans & ~DNET_TRANS_REPLY;
269 dnet_log(n, DNET_LOG_DEBUG, "%s: received trans: %llu / %llx, "
270 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
271 dnet_dump_id(&c->id), tid, (unsigned long long)c->trans,
272 !!(c->trans & DNET_TRANS_REPLY),
273 (unsigned long long)c->size, (unsigned long long)c->flags, c->status);
275 r = malloc(c->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_req));
276 if (!r) {
277 err = -ENOMEM;
278 goto out;
280 memset(r, 0, sizeof(struct dnet_io_req));
282 r->header = r + 1;
283 r->hsize = sizeof(struct dnet_cmd);
284 memcpy(r->header, &st->rcv_cmd, sizeof(struct dnet_cmd));
286 st->rcv_data = r;
287 st->rcv_offset = sizeof(struct dnet_io_req) + sizeof(struct dnet_cmd);
288 st->rcv_end = st->rcv_offset + c->size;
289 st->rcv_flags &= ~DNET_IO_CMD;
291 if (c->size) {
292 r->data = r->header + sizeof(struct dnet_cmd);
293 r->dsize = c->size;
296 * We read the command header, now get the data.
298 goto again;
302 r = st->rcv_data;
303 st->rcv_data = NULL;
305 dnet_schedule_command(st);
307 r->st = dnet_state_get(st);
309 dnet_schedule_io(n, r);
310 return 0;
312 out:
313 if (err != -EAGAIN && err != -EINTR)
314 dnet_schedule_command(st);
316 return err;
319 int dnet_state_accept_process(struct dnet_net_state *orig, struct epoll_event *ev __unused)
321 struct dnet_node *n = orig->n;
322 int err, cs;
323 struct dnet_addr addr;
324 struct dnet_net_state *st;
326 memset(&addr, 0, sizeof(addr));
328 addr.addr_len = sizeof(addr.addr);
329 cs = accept(orig->read_s, (struct sockaddr *)&addr.addr, &addr.addr_len);
330 if (cs <= 0) {
331 err = -errno;
332 if (err != -EAGAIN)
333 dnet_log_err(n, "failed to accept new client at %s", dnet_state_dump_addr(orig));
334 goto err_out_exit;
337 dnet_set_sockopt(cs);
339 st = dnet_state_create(n, 0, NULL, 0, &addr, cs, &err, 0, dnet_state_net_process);
340 if (!st) {
341 dnet_log(n, DNET_LOG_ERROR, "%s: Failed to create state for accepted client: %s [%d]\n",
342 dnet_server_convert_dnet_addr(&addr), strerror(-err), -err);
343 err = -EAGAIN;
344 goto err_out_exit;
347 dnet_log(n, DNET_LOG_INFO, "Accepted client %s, socket: %d.\n",
348 dnet_server_convert_dnet_addr(&addr), cs);
350 return 0;
351 /* socket is closed in dnet_state_create() */
352 err_out_exit:
353 return err;
356 void dnet_unschedule_send(struct dnet_net_state *st)
358 struct epoll_event ev;
360 ev.events = EPOLLOUT;
361 ev.data.ptr = st;
363 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->write_s, &ev);
366 void dnet_unschedule_recv(struct dnet_net_state *st)
368 struct epoll_event ev;
370 ev.events = EPOLLIN;
371 ev.data.ptr = st;
373 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->read_s, &ev);
376 static int dnet_process_send_single(struct dnet_net_state *st)
378 struct dnet_io_req *r = NULL;
379 int err;
381 while (1) {
382 r = NULL;
384 pthread_mutex_lock(&st->send_lock);
385 if (!list_empty(&st->send_list)) {
386 r = list_first_entry(&st->send_list, struct dnet_io_req, req_entry);
387 } else {
388 dnet_unschedule_send(st);
390 pthread_mutex_unlock(&st->send_lock);
392 if (!r) {
393 err = -EAGAIN;
394 goto err_out_exit;
397 err = dnet_send_request(st, r);
398 if (err)
399 goto err_out_exit;
402 err_out_exit:
403 return err;
406 static int dnet_schedule_network_io(struct dnet_net_state *st, int send)
408 struct epoll_event ev;
409 int err, fd;
411 if (send) {
412 ev.events = EPOLLOUT;
413 fd = st->write_s;
414 } else {
415 ev.events = EPOLLIN;
416 fd = st->read_s;
418 ev.data.ptr = st;
420 err = epoll_ctl(st->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
421 if (err < 0) {
422 err = -errno;
424 if (err == -EEXIST) {
425 err = 0;
426 } else {
427 dnet_log_err(st->n, "%s: failed to add %s event", dnet_state_dump_addr(st), send ? "SEND" : "RECV");
431 return err;
434 int dnet_schedule_send(struct dnet_net_state *st)
436 return dnet_schedule_network_io(st, 1);
439 int dnet_schedule_recv(struct dnet_net_state *st)
441 return dnet_schedule_network_io(st, 0);
444 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev)
446 int err = -ECONNRESET;
448 if (ev->events & EPOLLIN) {
449 err = dnet_process_recv_single(st);
450 if (err && (err != -EAGAIN))
451 goto err_out_exit;
453 if (ev->events & EPOLLOUT) {
454 err = dnet_process_send_single(st);
455 if (err && (err != -EAGAIN))
456 goto err_out_exit;
459 if (ev->events & (EPOLLHUP | EPOLLERR)) {
460 dnet_log(st->n, DNET_LOG_ERROR, "%s: received error event mask %x\n", dnet_state_dump_addr(st), ev->events);
461 err = -ECONNRESET;
463 err_out_exit:
464 return err;
467 static void *dnet_io_process_network(void *data_)
469 struct dnet_net_io *nio = data_;
470 struct dnet_node *n = nio->n;
471 struct dnet_net_state *st;
472 struct epoll_event ev;
473 int err = 0, check;
474 struct dnet_trans *t, *tmp;
475 struct timeval tv;
476 struct list_head head;
478 dnet_set_name("net_pool");
480 while (!n->need_exit) {
481 err = epoll_wait(nio->epoll_fd, &ev, 1, 1000);
482 if (err == 0)
483 continue;
485 if (err < 0) {
486 err = -errno;
488 if (err == -EAGAIN || err == -EINTR)
489 continue;
491 dnet_log_err(n, "Failed to wait for IO fds");
492 n->need_exit = err;
493 break;
496 st = ev.data.ptr;
497 st->epoll_fd = nio->epoll_fd;
498 check = st->stall;
500 while (1) {
501 err = st->process(st, &ev);
502 if (err == 0)
503 continue;
505 if (err == -EAGAIN && st->stall < DNET_DEFAULT_STALL_TRANSACTIONS)
506 break;
508 if (err < 0 || st->stall >= DNET_DEFAULT_STALL_TRANSACTIONS) {
509 dnet_state_reset(st);
510 check = 0;
511 break;
515 if (!check)
516 continue;
518 gettimeofday(&tv, NULL);
520 INIT_LIST_HEAD(&head);
522 pthread_mutex_lock(&st->trans_lock);
523 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_list_entry) {
524 if (t->time.tv_sec >= tv.tv_sec)
525 break;
527 dnet_trans_remove_nolock(&st->trans_root, t);
528 list_move(&t->trans_list_entry, &head);
530 pthread_mutex_unlock(&st->trans_lock);
532 list_for_each_entry_safe(t, tmp, &head, trans_list_entry) {
533 list_del_init(&t->trans_list_entry);
535 t->cmd.flags = 0;
536 t->cmd.size = 0;
537 t->cmd.status = -ETIMEDOUT;
539 dnet_log(st->n, DNET_LOG_ERROR, "%s: destructing trans: %llu on TIMEOUT\n",
540 dnet_state_dump_addr(st), (unsigned long long)t->trans);
542 if (t->complete)
543 t->complete(st, &t->cmd, t->priv);
545 dnet_trans_put(t);
549 return &n->need_exit;
552 static void dnet_io_cleanup_states(struct dnet_node *n)
554 struct dnet_net_state *st, *tmp;
556 list_for_each_entry_safe(st, tmp, &n->storage_state_list, storage_state_entry) {
557 dnet_state_reset(st);
561 struct dnet_io_process_data {
562 struct dnet_node *n;
563 int thread_number;
566 static void *dnet_io_process(void *data_)
568 struct dnet_work_io *wio = data_;
569 struct dnet_work_pool *pool = wio->pool;
570 struct dnet_node *n = pool->n;
571 struct dnet_net_state *st;
572 struct timespec ts;
573 struct timeval tv;
574 struct dnet_io_req *r;
575 int err;
577 dnet_set_name("io_pool");
579 while (!n->need_exit) {
580 r = NULL;
581 err = 0;
583 gettimeofday(&tv, NULL);
584 ts.tv_sec = tv.tv_sec + 1;
585 ts.tv_nsec = tv.tv_usec * 1000;
587 pthread_mutex_lock(&pool->lock);
589 if (!list_empty(&pool->list)) {
590 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
591 } else {
592 err = pthread_cond_timedwait(&pool->wait, &pool->lock, &ts);
593 if (!list_empty(&pool->list)) {
594 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
595 err = 0;
599 if (r)
600 list_del_init(&r->req_entry);
601 pthread_mutex_unlock(&pool->lock);
603 if (!r || err)
604 continue;
606 st = r->st;
608 dnet_log(n, DNET_LOG_DEBUG, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
609 dnet_state_dump_addr(st), dnet_dump_id(r->header), r, r->hsize, r->dsize, dnet_work_io_mode_str(pool->mode));
611 err = dnet_process_recv(st, r);
613 dnet_io_req_free(r);
614 dnet_state_put(st);
617 return NULL;
620 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg)
622 int err, i;
623 struct dnet_io *io;
624 int io_size = sizeof(struct dnet_io) + sizeof(struct dnet_net_io) * cfg->net_thread_num;
626 io = malloc(io_size);
627 if (!io) {
628 err = -ENOMEM;
629 goto err_out_exit;
632 memset(io, 0, io_size);
634 io->net_thread_num = cfg->net_thread_num;
635 io->net_thread_pos = 0;
636 io->net = (struct dnet_net_io *)(io + 1);
638 io->recv_pool = dnet_work_pool_alloc(n, cfg->io_thread_num, DNET_WORK_IO_MODE_BLOCKING, dnet_io_process);
639 if (!io->recv_pool) {
640 err = -ENOMEM;
641 goto err_out_free;
644 io->recv_pool_nb = dnet_work_pool_alloc(n, cfg->nonblocking_io_thread_num, DNET_WORK_IO_MODE_NONBLOCKING, dnet_io_process);
645 if (!io->recv_pool_nb) {
646 err = -ENOMEM;
647 goto err_out_free_recv_pool;
650 for (i=0; i<io->net_thread_num; ++i) {
651 struct dnet_net_io *nio = &io->net[i];
653 nio->n = n;
655 nio->epoll_fd = epoll_create(10000);
656 if (nio->epoll_fd < 0) {
657 err = -errno;
658 dnet_log_err(n, "Failed to create epoll fd");
659 goto err_out_net_destroy;
662 fcntl(nio->epoll_fd, F_SETFD, FD_CLOEXEC);
663 fcntl(nio->epoll_fd, F_SETFL, O_NONBLOCK);
665 err = pthread_create(&nio->tid, NULL, dnet_io_process_network, nio);
666 if (err) {
667 close(nio->epoll_fd);
668 err = -err;
669 dnet_log(n, DNET_LOG_ERROR, "Failed to create network processing thread: %d\n", err);
670 goto err_out_net_destroy;
674 n->io = io;
675 return 0;
677 err_out_net_destroy:
678 while (--i >= 0) {
679 pthread_join(io->net[i].tid, NULL);
680 close(io->net[i].epoll_fd);
683 dnet_work_pool_cleanup(io->recv_pool_nb);
684 err_out_free_recv_pool:
685 dnet_work_pool_cleanup(io->recv_pool);
686 err_out_free:
687 free(io);
688 err_out_exit:
689 n->io = NULL;
690 return err;
693 void dnet_io_exit(struct dnet_node *n)
695 struct dnet_io *io = n->io;
696 int i;
698 n->need_exit = 1;
700 for (i=0; i<io->net_thread_num; ++i) {
701 pthread_join(io->net[i].tid, NULL);
702 close(io->net[i].epoll_fd);
705 dnet_work_pool_cleanup(io->recv_pool_nb);
706 dnet_work_pool_cleanup(io->recv_pool);
708 dnet_io_cleanup_states(n);
710 free(io);