Populate cache on READ command when DNET_IO_FLAGS_CACHE ioflag is set
[elliptics.git] / library / pool.c
bloba9cce9242d9f537713ebde7b1a29151e23549030
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static char *dnet_work_io_mode_string[] = {
28 [DNET_WORK_IO_MODE_BLOCKING] = "BLOCKING",
29 [DNET_WORK_IO_MODE_NONBLOCKING] = "NONBLOCKING",
32 static char *dnet_work_io_mode_str(int mode)
34 if (mode < 0 || mode >= (int)ARRAY_SIZE(dnet_work_io_mode_string))
35 return NULL;
37 return dnet_work_io_mode_string[mode];
40 static void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r)
42 struct dnet_io *io = n->io;
43 struct dnet_cmd *cmd = r->header;
44 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
45 struct dnet_work_pool *pool = io->recv_pool;
47 if (cmd->size > 0) {
48 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
49 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd),
50 (unsigned long long)cmd->size, nonblocking);
51 } else if ((cmd->size == 0) && !(cmd->flags & DNET_FLAGS_MORE) && (cmd->trans & DNET_TRANS_REPLY)) {
52 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV ACK: %s: nonblocking: %d\n",
53 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking);
54 } else {
55 unsigned long long tid = cmd->trans & ~DNET_TRANS_REPLY;
56 int reply = !!(cmd->trans & DNET_TRANS_REPLY);
58 dnet_log(r->st->n, DNET_LOG_DEBUG, "%s: %s: RECV: %s: nonblocking: %d, cmd-size: %llu, cflags: %llx, trans: %lld, reply: %d\n",
59 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(cmd->cmd), nonblocking,
60 (unsigned long long)cmd->size, (unsigned long long)cmd->flags, tid, reply);
64 if (nonblocking)
65 pool = io->recv_pool_nb;
66 if ((cmd->cmd == DNET_CMD_EXEC) && (cmd->size >= sizeof(struct sph))) {
67 struct sph *sph = (struct sph *)r->data;
68 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK)
69 pool = io->recv_pool_eblock;
72 pthread_mutex_lock(&pool->lock);
73 list_add_tail(&r->req_entry, &pool->list);
74 pthread_cond_broadcast(&pool->wait);
75 pthread_mutex_unlock(&pool->lock);
78 void dnet_schedule_command(struct dnet_net_state *st)
80 st->rcv_flags = DNET_IO_CMD;
82 if (st->rcv_data) {
83 #if 0
84 struct dnet_cmd *c = &st->rcv_cmd;
85 unsigned long long tid = c->trans & ~DNET_TRANS_REPLY;
86 dnet_log(st->n, DNET_LOG_DEBUG, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
87 (unsigned long long)c->size, tid, tid != c->trans, st->rcv_data);
88 #endif
89 free(st->rcv_data);
90 st->rcv_data = NULL;
93 st->rcv_end = sizeof(struct dnet_cmd);
94 st->rcv_offset = 0;
97 static int dnet_process_recv_single(struct dnet_net_state *st)
99 struct dnet_node *n = st->n;
100 struct dnet_io_req *r;
101 void *data;
102 uint64_t size;
103 int err;
105 again:
107 * Reading command first.
109 if (st->rcv_flags & DNET_IO_CMD)
110 data = &st->rcv_cmd;
111 else
112 data = st->rcv_data;
113 data += st->rcv_offset;
114 size = st->rcv_end - st->rcv_offset;
116 if (size) {
117 err = recv(st->read_s, data, size, 0);
118 if (err < 0) {
119 err = -EAGAIN;
120 if (errno != EAGAIN && errno != EINTR) {
121 err = -errno;
122 dnet_log_err(n, "failed to receive data, socket: %d", st->read_s);
123 goto out;
126 goto out;
129 if (err == 0) {
130 dnet_log(n, DNET_LOG_ERROR, "Peer %s has disconnected.\n",
131 dnet_server_convert_dnet_addr(&st->addr));
132 err = -ECONNRESET;
133 goto out;
136 st->rcv_offset += err;
139 if (st->rcv_offset != st->rcv_end)
140 goto again;
142 if (st->rcv_flags & DNET_IO_CMD) {
143 unsigned long long tid;
144 struct dnet_cmd *c = &st->rcv_cmd;
146 dnet_convert_cmd(c);
148 tid = c->trans & ~DNET_TRANS_REPLY;
150 dnet_log(n, DNET_LOG_DEBUG, "%s: received trans: %llu / %llx, "
151 "reply: %d, size: %llu, flags: %llx, status: %d.\n",
152 dnet_dump_id(&c->id), tid, (unsigned long long)c->trans,
153 !!(c->trans & DNET_TRANS_REPLY),
154 (unsigned long long)c->size, (unsigned long long)c->flags, c->status);
156 r = malloc(c->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_req));
157 if (!r) {
158 err = -ENOMEM;
159 goto out;
161 memset(r, 0, sizeof(struct dnet_io_req));
163 r->header = r + 1;
164 r->hsize = sizeof(struct dnet_cmd);
165 memcpy(r->header, &st->rcv_cmd, sizeof(struct dnet_cmd));
167 st->rcv_data = r;
168 st->rcv_offset = sizeof(struct dnet_io_req) + sizeof(struct dnet_cmd);
169 st->rcv_end = st->rcv_offset + c->size;
170 st->rcv_flags &= ~DNET_IO_CMD;
172 if (c->size) {
173 r->data = r->header + sizeof(struct dnet_cmd);
174 r->dsize = c->size;
177 * We read the command header, now get the data.
179 goto again;
183 r = st->rcv_data;
184 st->rcv_data = NULL;
186 dnet_schedule_command(st);
188 r->st = dnet_state_get(st);
190 dnet_schedule_io(n, r);
191 return 0;
193 out:
194 if (err != -EAGAIN && err != -EINTR)
195 dnet_schedule_command(st);
197 return err;
200 int dnet_state_accept_process(struct dnet_net_state *orig, struct epoll_event *ev __unused)
202 struct dnet_node *n = orig->n;
203 int err, cs;
204 struct dnet_addr addr;
205 struct dnet_net_state *st;
207 memset(&addr, 0, sizeof(addr));
209 addr.addr_len = sizeof(addr.addr);
210 cs = accept(orig->read_s, (struct sockaddr *)&addr.addr, &addr.addr_len);
211 if (cs <= 0) {
212 err = -errno;
213 if (err != -EAGAIN)
214 dnet_log_err(n, "failed to accept new client at %s", dnet_state_dump_addr(orig));
215 goto err_out_exit;
218 dnet_set_sockopt(cs);
220 st = dnet_state_create(n, 0, NULL, 0, &addr, cs, &err, 0, dnet_state_net_process);
221 if (!st) {
222 dnet_log(n, DNET_LOG_ERROR, "%s: Failed to create state for accepted client: %s [%d]\n",
223 dnet_server_convert_dnet_addr(&addr), strerror(-err), -err);
224 err = -EAGAIN;
225 goto err_out_exit;
228 dnet_log(n, DNET_LOG_INFO, "Accepted client %s, socket: %d.\n",
229 dnet_server_convert_dnet_addr(&addr), cs);
231 return 0;
232 /* socket is closed in dnet_state_create() */
233 err_out_exit:
234 return err;
237 void dnet_unschedule_send(struct dnet_net_state *st)
239 struct epoll_event ev;
241 ev.events = EPOLLOUT;
242 ev.data.ptr = st;
244 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->write_s, &ev);
247 void dnet_unschedule_recv(struct dnet_net_state *st)
249 struct epoll_event ev;
251 ev.events = EPOLLIN;
252 ev.data.ptr = st;
254 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->read_s, &ev);
257 static int dnet_process_send_single(struct dnet_net_state *st)
259 struct dnet_io_req *r = NULL;
260 int err;
262 while (1) {
263 r = NULL;
265 pthread_mutex_lock(&st->send_lock);
266 if (!list_empty(&st->send_list)) {
267 r = list_first_entry(&st->send_list, struct dnet_io_req, req_entry);
268 } else {
269 dnet_unschedule_send(st);
271 pthread_mutex_unlock(&st->send_lock);
273 if (!r) {
274 err = -EAGAIN;
275 goto err_out_exit;
278 err = dnet_send_request(st, r);
279 if (err)
280 goto err_out_exit;
283 err_out_exit:
284 return err;
287 static int dnet_schedule_network_io(struct dnet_net_state *st, int send)
289 struct epoll_event ev;
290 int err, fd;
292 if (send) {
293 ev.events = EPOLLOUT;
294 fd = st->write_s;
295 } else {
296 ev.events = EPOLLIN;
297 fd = st->read_s;
299 ev.data.ptr = st;
301 err = epoll_ctl(st->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
302 if (err < 0) {
303 err = -errno;
305 if (err == -EEXIST) {
306 err = 0;
307 } else {
308 dnet_log_err(st->n, "%s: failed to add %s event", dnet_state_dump_addr(st), send ? "SEND" : "RECV");
312 return err;
315 int dnet_schedule_send(struct dnet_net_state *st)
317 return dnet_schedule_network_io(st, 1);
320 int dnet_schedule_recv(struct dnet_net_state *st)
322 return dnet_schedule_network_io(st, 0);
325 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev)
327 int err = -ECONNRESET;
329 if (ev->events & EPOLLIN) {
330 err = dnet_process_recv_single(st);
331 if (err && (err != -EAGAIN))
332 goto err_out_exit;
334 if (ev->events & EPOLLOUT) {
335 err = dnet_process_send_single(st);
336 if (err && (err != -EAGAIN))
337 goto err_out_exit;
340 if (ev->events & (EPOLLHUP | EPOLLERR)) {
341 dnet_log(st->n, DNET_LOG_ERROR, "%s: received error event mask %x\n", dnet_state_dump_addr(st), ev->events);
342 err = -ECONNRESET;
344 err_out_exit:
345 return err;
348 static void *dnet_io_process_network(void *data_)
350 struct dnet_net_io *nio = data_;
351 struct dnet_node *n = nio->n;
352 struct dnet_net_state *st;
353 struct epoll_event ev;
354 int err = 0, check;
355 struct dnet_trans *t, *tmp;
356 struct timeval tv;
357 struct list_head head;
359 dnet_set_name("net_pool");
361 while (!n->need_exit) {
362 err = epoll_wait(nio->epoll_fd, &ev, 1, 1000);
363 if (err == 0)
364 continue;
366 if (err < 0) {
367 err = -errno;
369 if (err == -EAGAIN || err == -EINTR)
370 continue;
372 dnet_log_err(n, "Failed to wait for IO fds");
373 n->need_exit = err;
374 break;
377 st = ev.data.ptr;
378 st->epoll_fd = nio->epoll_fd;
379 check = st->stall;
381 while (1) {
382 err = st->process(st, &ev);
383 if (err == 0)
384 continue;
386 if (err == -EAGAIN && st->stall < DNET_DEFAULT_STALL_TRANSACTIONS)
387 break;
389 if (err < 0 || st->stall >= DNET_DEFAULT_STALL_TRANSACTIONS) {
390 dnet_state_reset(st);
391 check = 0;
392 break;
396 if (!check)
397 continue;
399 gettimeofday(&tv, NULL);
401 INIT_LIST_HEAD(&head);
403 pthread_mutex_lock(&st->trans_lock);
404 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_list_entry) {
405 if (t->time.tv_sec >= tv.tv_sec)
406 break;
408 dnet_trans_remove_nolock(&st->trans_root, t);
409 list_move(&t->trans_list_entry, &head);
411 pthread_mutex_unlock(&st->trans_lock);
413 list_for_each_entry_safe(t, tmp, &head, trans_list_entry) {
414 list_del_init(&t->trans_list_entry);
416 t->cmd.flags = 0;
417 t->cmd.size = 0;
418 t->cmd.status = -ETIMEDOUT;
420 dnet_log(st->n, DNET_LOG_ERROR, "%s: destructing trans: %llu on TIMEOUT\n",
421 dnet_state_dump_addr(st), (unsigned long long)t->trans);
423 if (t->complete)
424 t->complete(st, &t->cmd, t->priv);
426 dnet_trans_put(t);
430 return &n->need_exit;
433 static void dnet_io_cleanup_states(struct dnet_node *n)
435 struct dnet_net_state *st, *tmp;
437 list_for_each_entry_safe(st, tmp, &n->storage_state_list, storage_state_entry) {
438 dnet_state_reset(st);
442 struct dnet_io_process_data {
443 struct dnet_node *n;
444 int thread_number;
447 static void *dnet_io_process(void *data_)
449 struct dnet_work_io *wio = data_;
450 struct dnet_work_pool *pool = wio->pool;
451 struct dnet_node *n = pool->n;
452 struct dnet_net_state *st;
453 struct timespec ts;
454 struct timeval tv;
455 struct dnet_io_req *r;
456 int err;
458 dnet_set_name("io_pool");
460 while (!n->need_exit) {
461 r = NULL;
462 err = 0;
464 gettimeofday(&tv, NULL);
465 ts.tv_sec = tv.tv_sec + 1;
466 ts.tv_nsec = tv.tv_usec * 1000;
468 pthread_mutex_lock(&pool->lock);
470 if (!list_empty(&pool->list)) {
471 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
472 } else {
473 err = pthread_cond_timedwait(&pool->wait, &pool->lock, &ts);
474 if (!list_empty(&pool->list)) {
475 r = list_first_entry(&pool->list, struct dnet_io_req, req_entry);
476 err = 0;
480 if (r)
481 list_del_init(&r->req_entry);
482 pthread_mutex_unlock(&pool->lock);
484 if (!r || err)
485 continue;
487 st = r->st;
489 dnet_log(n, DNET_LOG_DEBUG, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, mode: %s\n",
490 dnet_state_dump_addr(st), dnet_dump_id(r->header), r, r->hsize, r->dsize, dnet_work_io_mode_str(pool->mode));
492 err = dnet_process_recv(st, r);
494 dnet_io_req_free(r);
495 dnet_state_put(st);
498 return NULL;
501 static void dnet_work_pool_cleanup(struct dnet_work_pool *pool)
503 struct dnet_io_req *r, *tmp;
504 int i;
506 for (i = 0; i < pool->num; ++i) {
507 struct dnet_work_io *wio = &pool->wio[i];
509 pthread_join(wio->tid, NULL);
512 list_for_each_entry_safe(r, tmp, &pool->list, req_entry) {
513 list_del(&r->req_entry);
514 dnet_io_req_free(r);
517 pthread_cond_destroy(&pool->wait);
518 pthread_mutex_destroy(&pool->lock);
519 free(pool);
522 static struct dnet_work_pool *dnet_work_pool_alloc(struct dnet_node *n, int num, int mode, void *(* process)(void *))
524 struct dnet_work_pool *pool;
525 int err, i;
527 pool = malloc(sizeof(struct dnet_work_pool) + num * sizeof(struct dnet_work_io));
528 if (!pool) {
529 err = -ENOMEM;
530 goto err_out_exit;
533 memset(pool, 0, sizeof(struct dnet_work_pool) + num * sizeof(struct dnet_work_io));
535 pool->num = num;
536 pool->mode = mode;
537 pool->n = n;
538 INIT_LIST_HEAD(&pool->list);
540 err = pthread_mutex_init(&pool->lock, NULL);
541 if (err) {
542 err = -err;
543 goto err_out_free;
546 err = pthread_cond_init(&pool->wait, NULL);
547 if (err) {
548 err = -err;
549 goto err_out_mutex_destroy;
552 for (i = 0; i < num; ++i) {
553 struct dnet_work_io *wio = &pool->wio[i];
555 wio->thread_index = i;
556 wio->pool = pool;
558 err = pthread_create(&wio->tid, NULL, process, wio);
559 if (err) {
560 err = -err;
561 dnet_log(n, DNET_LOG_ERROR, "Failed to create IO thread: %d\n", err);
562 goto err_out_io_threads;
566 return pool;
568 err_out_io_threads:
569 while (--i >= 0) {
570 struct dnet_work_io *wio = &pool->wio[i];
571 pthread_join(wio->tid, NULL);
573 pthread_cond_destroy(&pool->wait);
574 err_out_mutex_destroy:
575 pthread_mutex_destroy(&pool->lock);
576 err_out_free:
577 free(pool);
578 err_out_exit:
579 return NULL;
582 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg)
584 int err, i;
585 struct dnet_io *io;
586 int io_size = sizeof(struct dnet_io) + sizeof(struct dnet_net_io) * cfg->net_thread_num;
588 io = malloc(io_size);
589 if (!io) {
590 err = -ENOMEM;
591 goto err_out_exit;
594 memset(io, 0, io_size);
596 io->net_thread_num = cfg->net_thread_num;
597 io->net_thread_pos = 0;
598 io->net = (struct dnet_net_io *)(io + 1);
600 io->recv_pool = dnet_work_pool_alloc(n, cfg->io_thread_num, DNET_WORK_IO_MODE_BLOCKING, dnet_io_process);
601 if (!io->recv_pool) {
602 err = -ENOMEM;
603 goto err_out_free;
606 io->recv_pool_nb = dnet_work_pool_alloc(n, cfg->nonblocking_io_thread_num, DNET_WORK_IO_MODE_NONBLOCKING, dnet_io_process);
607 if (!io->recv_pool_nb) {
608 err = -ENOMEM;
609 goto err_out_free_recv_pool;
612 io->recv_pool_eblock = dnet_work_pool_alloc(n, cfg->nonblocking_io_thread_num, DNET_WORK_IO_MODE_EXEC_BLOCKING, dnet_io_process);
613 if (!io->recv_pool_nb) {
614 err = -ENOMEM;
615 goto err_out_free_recv_pool_nb;
618 for (i=0; i<io->net_thread_num; ++i) {
619 struct dnet_net_io *nio = &io->net[i];
621 nio->n = n;
623 nio->epoll_fd = epoll_create(10000);
624 if (nio->epoll_fd < 0) {
625 err = -errno;
626 dnet_log_err(n, "Failed to create epoll fd");
627 goto err_out_net_destroy;
630 fcntl(nio->epoll_fd, F_SETFD, FD_CLOEXEC);
631 fcntl(nio->epoll_fd, F_SETFL, O_NONBLOCK);
633 err = pthread_create(&nio->tid, NULL, dnet_io_process_network, nio);
634 if (err) {
635 close(nio->epoll_fd);
636 err = -err;
637 dnet_log(n, DNET_LOG_ERROR, "Failed to create network processing thread: %d\n", err);
638 goto err_out_net_destroy;
642 n->io = io;
643 return 0;
645 err_out_net_destroy:
646 while (--i >= 0) {
647 pthread_join(io->net[i].tid, NULL);
648 close(io->net[i].epoll_fd);
651 dnet_work_pool_cleanup(io->recv_pool_eblock);
652 err_out_free_recv_pool_nb:
653 dnet_work_pool_cleanup(io->recv_pool_nb);
654 err_out_free_recv_pool:
655 dnet_work_pool_cleanup(io->recv_pool);
656 err_out_free:
657 free(io);
658 err_out_exit:
659 n->io = NULL;
660 return err;
663 void dnet_io_exit(struct dnet_node *n)
665 struct dnet_io *io = n->io;
666 int i;
668 n->need_exit = 1;
670 for (i=0; i<io->net_thread_num; ++i) {
671 pthread_join(io->net[i].tid, NULL);
672 close(io->net[i].epoll_fd);
675 dnet_work_pool_cleanup(io->recv_pool_eblock);
676 dnet_work_pool_cleanup(io->recv_pool_nb);
677 dnet_work_pool_cleanup(io->recv_pool);
679 dnet_io_cleanup_states(n);
681 free(io);