Get rid of redundant srw_load_ctl
[elliptics.git] / library / pool.c
blobe723188ffcf160643dd5759f186644f4a5f46f1b
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include "config.h"
18 #include <sys/stat.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <fcntl.h>
24 #include <signal.h>
26 #include "elliptics.h"
27 #include "elliptics/interface.h"
29 static void dnet_schedule_io(struct dnet_node *n, struct dnet_io_req *r)
31 struct dnet_io *io = n->io;
32 struct dnet_cmd *cmd = r->header;
33 int nonblocking = !!(cmd->flags & DNET_FLAGS_NOLOCK);
34 struct dnet_attr *attr = r->header + sizeof(struct dnet_cmd);
36 dnet_log(r->st->n, DNET_LOG_DSA, "%s: %s: RECV cmd: %s: cmd-size: %llu, nonblocking: %d\n",
37 dnet_state_dump_addr(r->st), dnet_dump_id(r->header), dnet_cmd_string(attr->cmd),
38 (unsigned long long)cmd->size, nonblocking);
40 pthread_mutex_lock(&io->recv_lock);
42 if (nonblocking)
43 list_add_tail(&r->req_entry, &io->nonblocking_recv_list);
44 else
45 list_add_tail(&r->req_entry, &io->recv_list);
47 pthread_cond_broadcast(&io->recv_wait);
48 pthread_mutex_unlock(&io->recv_lock);
51 void dnet_schedule_command(struct dnet_net_state *st)
53 st->rcv_flags = DNET_IO_CMD;
55 if (st->rcv_data) {
56 #if 0
57 struct dnet_cmd *c = &st->rcv_cmd;
58 unsigned long long tid = c->trans & ~DNET_TRANS_REPLY;
59 dnet_log(st->n, DNET_LOG_DSA, "freed: size: %llu, trans: %llu, reply: %d, ptr: %p.\n",
60 (unsigned long long)c->size, tid, tid != c->trans, st->rcv_data);
61 #endif
62 free(st->rcv_data);
63 st->rcv_data = NULL;
66 st->rcv_end = sizeof(struct dnet_cmd);
67 st->rcv_offset = 0;
70 static int dnet_process_recv_single(struct dnet_net_state *st)
72 struct dnet_node *n = st->n;
73 struct dnet_io_req *r;
74 void *data;
75 uint64_t size;
76 int err;
78 again:
80 * Reading command first.
82 if (st->rcv_flags & DNET_IO_CMD)
83 data = &st->rcv_cmd;
84 else
85 data = st->rcv_data;
86 data += st->rcv_offset;
87 size = st->rcv_end - st->rcv_offset;
89 if (size) {
90 err = recv(st->read_s, data, size, 0);
91 if (err < 0) {
92 err = -EAGAIN;
93 if (errno != EAGAIN && errno != EINTR) {
94 err = -errno;
95 dnet_log_err(n, "failed to receive data, socket: %d", st->read_s);
96 goto out;
99 goto out;
102 if (err == 0) {
103 dnet_log(n, DNET_LOG_ERROR, "Peer %s has disconnected.\n",
104 dnet_server_convert_dnet_addr(&st->addr));
105 err = -ECONNRESET;
106 goto out;
109 st->rcv_offset += err;
112 if (st->rcv_offset != st->rcv_end)
113 goto again;
115 if (st->rcv_flags & DNET_IO_CMD) {
116 unsigned long long tid;
117 struct dnet_cmd *c = &st->rcv_cmd;
119 dnet_convert_cmd(c);
121 tid = c->trans & ~DNET_TRANS_REPLY;
123 dnet_log(n, DNET_LOG_DSA, "%s: received trans: %llu / %llx, "
124 "reply: %d, size: %llu, flags: %x, status: %d.\n",
125 dnet_dump_id(&c->id), tid, (unsigned long long)c->trans,
126 !!(c->trans & DNET_TRANS_REPLY),
127 (unsigned long long)c->size, c->flags, c->status);
129 r = malloc(c->size + sizeof(struct dnet_cmd) + sizeof(struct dnet_io_req));
130 if (!r) {
131 err = -ENOMEM;
132 goto out;
134 memset(r, 0, sizeof(struct dnet_io_req));
136 r->header = r + 1;
137 r->hsize = sizeof(struct dnet_cmd);
138 memcpy(r->header, &st->rcv_cmd, sizeof(struct dnet_cmd));
140 st->rcv_data = r;
141 st->rcv_offset = sizeof(struct dnet_io_req) + sizeof(struct dnet_cmd);
142 st->rcv_end = st->rcv_offset + c->size;
143 st->rcv_flags &= ~DNET_IO_CMD;
145 if (c->size) {
146 r->data = r->header + sizeof(struct dnet_cmd);
147 r->dsize = c->size;
150 * We read the command header, now get the data.
152 goto again;
156 r = st->rcv_data;
157 st->rcv_data = NULL;
159 dnet_schedule_command(st);
161 r->st = dnet_state_get(st);
163 dnet_schedule_io(n, r);
164 return 0;
166 out:
167 if (err != -EAGAIN && err != -EINTR)
168 dnet_schedule_command(st);
170 return err;
173 int dnet_state_accept_process(struct dnet_net_state *orig, struct epoll_event *ev __unused)
175 struct dnet_node *n = orig->n;
176 int err, cs;
177 struct dnet_addr addr;
178 struct dnet_net_state *st;
180 memset(&addr, 0, sizeof(addr));
182 addr.addr_len = sizeof(addr.addr);
183 cs = accept(orig->read_s, (struct sockaddr *)&addr.addr, &addr.addr_len);
184 if (cs <= 0) {
185 err = -errno;
186 if (err != -EAGAIN)
187 dnet_log_err(n, "failed to accept new client at %s", dnet_state_dump_addr(orig));
188 goto err_out_exit;
191 dnet_set_sockopt(cs);
193 st = dnet_state_create(n, 0, NULL, 0, &addr, cs, &err, 0, dnet_state_net_process);
194 if (!st) {
195 dnet_log(n, DNET_LOG_ERROR, "%s: Failed to create state for accepted client: %s [%d]\n",
196 dnet_server_convert_dnet_addr(&addr), strerror(-err), -err);
197 err = -EAGAIN;
198 goto err_out_exit;
201 dnet_log(n, DNET_LOG_INFO, "Accepted client %s, socket: %d.\n",
202 dnet_server_convert_dnet_addr(&addr), cs);
204 return 0;
205 /* socket is closed in dnet_state_create() */
206 err_out_exit:
207 return err;
210 void dnet_unschedule_send(struct dnet_net_state *st)
212 struct epoll_event ev;
214 ev.events = EPOLLOUT;
215 ev.data.ptr = st;
217 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->write_s, &ev);
220 void dnet_unschedule_recv(struct dnet_net_state *st)
222 struct epoll_event ev;
224 ev.events = EPOLLIN;
225 ev.data.ptr = st;
227 epoll_ctl(st->epoll_fd, EPOLL_CTL_DEL, st->read_s, &ev);
230 static int dnet_process_send_single(struct dnet_net_state *st)
232 struct dnet_io_req *r = NULL;
233 int err;
235 while (1) {
236 r = NULL;
238 pthread_mutex_lock(&st->send_lock);
239 if (!list_empty(&st->send_list)) {
240 r = list_first_entry(&st->send_list, struct dnet_io_req, req_entry);
241 } else {
242 dnet_unschedule_send(st);
244 pthread_mutex_unlock(&st->send_lock);
246 if (!r) {
247 err = -EAGAIN;
248 goto err_out_exit;
251 err = dnet_send_request(st, r);
252 if (err)
253 goto err_out_exit;
256 err_out_exit:
257 return err;
260 static int dnet_schedule_network_io(struct dnet_net_state *st, int send)
262 struct epoll_event ev;
263 int err, fd;
265 if (send) {
266 ev.events = EPOLLOUT;
267 fd = st->write_s;
268 } else {
269 ev.events = EPOLLIN;
270 fd = st->read_s;
272 ev.data.ptr = st;
274 err = epoll_ctl(st->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
275 if (err < 0) {
276 err = -errno;
278 if (err == -EEXIST) {
279 err = 0;
280 } else {
281 dnet_log_err(st->n, "%s: failed to add %s event", dnet_state_dump_addr(st), send ? "SEND" : "RECV");
285 return err;
288 int dnet_schedule_send(struct dnet_net_state *st)
290 return dnet_schedule_network_io(st, 1);
293 int dnet_schedule_recv(struct dnet_net_state *st)
295 return dnet_schedule_network_io(st, 0);
298 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev)
300 int err = -ECONNRESET;
302 if (ev->events & EPOLLIN) {
303 err = dnet_process_recv_single(st);
304 if (err && (err != -EAGAIN))
305 goto err_out_exit;
307 if (ev->events & EPOLLOUT) {
308 err = dnet_process_send_single(st);
309 if (err && (err != -EAGAIN))
310 goto err_out_exit;
313 if (ev->events & (EPOLLHUP | EPOLLERR)) {
314 dnet_log(st->n, DNET_LOG_ERROR, "%s: received error event mask %x\n", dnet_state_dump_addr(st), ev->events);
315 err = -ECONNRESET;
317 err_out_exit:
318 return err;
321 static void *dnet_io_process(void *data_)
323 struct dnet_net_io *nio = data_;
324 struct dnet_node *n = nio->n;
325 struct dnet_net_state *st;
326 struct epoll_event ev;
327 int err = 0, check;
328 struct dnet_trans *t, *tmp;
329 struct timeval tv;
330 struct list_head head;
332 dnet_set_name("net_pool");
333 dnet_log(n, DNET_LOG_NOTICE, "Starting network processing thread.\n");
335 while (!n->need_exit) {
336 err = epoll_wait(nio->epoll_fd, &ev, 1, 1000);
337 if (err == 0)
338 continue;
340 if (err < 0) {
341 err = -errno;
343 if (err == -EAGAIN || err == -EINTR)
344 continue;
346 dnet_log_err(n, "Failed to wait for IO fds");
347 n->need_exit = err;
348 break;
351 st = ev.data.ptr;
352 st->epoll_fd = nio->epoll_fd;
353 check = st->stall;
355 while (1) {
356 err = st->process(st, &ev);
357 if (err == 0)
358 continue;
360 if (err == -EAGAIN && st->stall < DNET_DEFAULT_STALL_TRANSACTIONS)
361 break;
363 if (err < 0 || st->stall >= DNET_DEFAULT_STALL_TRANSACTIONS) {
364 dnet_state_reset(st);
365 check = 0;
366 break;
370 if (!check)
371 continue;
373 gettimeofday(&tv, NULL);
375 INIT_LIST_HEAD(&head);
377 pthread_mutex_lock(&st->trans_lock);
378 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_list_entry) {
379 if (t->time.tv_sec >= tv.tv_sec)
380 break;
382 dnet_trans_remove_nolock(&st->trans_root, t);
383 list_move(&t->trans_list_entry, &head);
385 pthread_mutex_unlock(&st->trans_lock);
387 list_for_each_entry_safe(t, tmp, &head, trans_list_entry) {
388 list_del_init(&t->trans_list_entry);
390 t->cmd.flags = 0;
391 t->cmd.size = 0;
392 t->cmd.status = -ETIMEDOUT;
394 dnet_log(st->n, DNET_LOG_ERROR, "%s: destructing trans: %llu on TIMEOUT\n",
395 dnet_state_dump_addr(st), (unsigned long long)t->trans);
397 if (t->complete)
398 t->complete(st, &t->cmd, NULL, t->priv);
400 dnet_trans_put(t);
404 dnet_log(n, DNET_LOG_NOTICE, "Exiting network processing thread: need_exit: %d, err: %d.\n", n->need_exit, err);
405 return &n->need_exit;
408 static void dnet_io_cleanup_states(struct dnet_node *n)
410 struct dnet_net_state *st, *tmp;
412 list_for_each_entry_safe(st, tmp, &n->storage_state_list, storage_state_entry) {
413 dnet_state_reset(st);
417 struct dnet_io_process_data {
418 struct dnet_node *n;
419 int thread_number;
422 static void *dnet_io_process_pool(void *data_)
424 struct dnet_work_io *wio = data_;
425 struct dnet_node *n = wio->n;
426 struct dnet_net_state *st;
427 struct dnet_io *io = n->io;
428 struct timespec ts;
429 struct timeval tv;
430 struct dnet_io_req *r;
431 struct list_head *head;
432 int err = 0;
434 dnet_log(n, DNET_LOG_NOTICE, "Starting %s IO processing thread.\n", wio->nonblocking ? "nonblocking" : "blocking");
435 dnet_set_name("io_pool");
437 while (!n->need_exit) {
438 r = NULL;
439 err = 0;
441 gettimeofday(&tv, NULL);
442 ts.tv_sec = tv.tv_sec + 1;
443 ts.tv_nsec = tv.tv_usec * 1000;
445 pthread_mutex_lock(&io->recv_lock);
446 head = &io->recv_list;
448 if (wio->nonblocking)
449 head = &io->nonblocking_recv_list;
451 if (!list_empty(head)) {
452 r = list_first_entry(head, struct dnet_io_req, req_entry);
453 } else {
454 err = pthread_cond_timedwait(&io->recv_wait, &io->recv_lock, &ts);
455 if (!list_empty(head)) {
456 r = list_first_entry(head, struct dnet_io_req, req_entry);
457 err = 0;
461 if (r)
462 list_del_init(&r->req_entry);
463 pthread_mutex_unlock(&io->recv_lock);
465 if (!r)
466 continue;
468 st = r->st;
470 dnet_log(n, DNET_LOG_DSA, "%s: %s: got IO event: %p: hsize: %zu, dsize: %zu, nonblocking: %d\n",
471 dnet_state_dump_addr(st), dnet_dump_id(r->header), r, r->hsize, r->dsize, wio->nonblocking);
473 err = dnet_process_recv(st, r);
475 dnet_io_req_free(r);
476 dnet_state_put(st);
479 dnet_log(n, DNET_LOG_DSA, "Exiting IO processing thread: need_exit: %d, err: %d.\n", n->need_exit, err);
480 return NULL;
483 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg)
485 int err, i;
486 struct dnet_io *io;
487 int io_size = sizeof(struct dnet_io) +
488 sizeof(struct dnet_net_io) * cfg->net_thread_num +
489 sizeof(struct dnet_work_io) * (cfg->io_thread_num + cfg->nonblocking_io_thread_num);
491 io = malloc(io_size);
492 if (!io) {
493 err = -ENOMEM;
494 goto err_out_exit;
497 memset(io, 0, io_size);
499 io->nonblocking_thread_num = cfg->nonblocking_io_thread_num;
500 io->thread_num = cfg->io_thread_num;
501 io->net_thread_num = cfg->net_thread_num;
503 io->net = (struct dnet_net_io *)(io + 1);
504 io->wio = (struct dnet_work_io *)(io->net + cfg->net_thread_num);
506 INIT_LIST_HEAD(&io->recv_list);
507 INIT_LIST_HEAD(&io->nonblocking_recv_list);
508 n->io = io;
510 err = pthread_cond_init(&io->recv_wait, NULL);
511 if (err) {
512 err = -err;
513 dnet_log(n, DNET_LOG_ERROR, "Failed to initialize send cond: %d\n", err);
514 goto err_out_free;
517 err = pthread_mutex_init(&io->recv_lock, NULL);
518 if (err) {
519 err = -err;
520 dnet_log(n, DNET_LOG_ERROR, "Failed to initialize send lock: %d\n", err);
521 goto err_out_recv_cond;
524 for (i=0; i<io->net_thread_num; ++i) {
525 struct dnet_net_io *nio = &io->net[i];
527 nio->n = n;
529 nio->epoll_fd = epoll_create(10000);
530 if (nio->epoll_fd < 0) {
531 err = -errno;
532 dnet_log_err(n, "Failed to create epoll fd");
533 goto err_out_net_destroy;
536 fcntl(nio->epoll_fd, F_SETFD, FD_CLOEXEC);
537 fcntl(nio->epoll_fd, F_SETFL, O_NONBLOCK);
539 err = pthread_create(&nio->tid, NULL, dnet_io_process, nio);
540 if (err) {
541 close(nio->epoll_fd);
542 err = -err;
543 dnet_log(n, DNET_LOG_ERROR, "Failed to create network processing thread: %d\n", err);
544 goto err_out_net_destroy;
548 dnet_log(n, DNET_LOG_INFO, "Starting %d blocking threads and %d nonblocking threads\n", io->thread_num, io->nonblocking_thread_num);
549 for (i=0; i<io->thread_num + io->nonblocking_thread_num; ++i) {
550 struct dnet_work_io *wio = &io->wio[i];
552 wio->n = n;
553 wio->thread_index = i;
555 if (i >= io->thread_num)
556 wio->nonblocking = 1;
558 err = pthread_create(&wio->tid, NULL, dnet_io_process_pool, wio);
559 if (err) {
560 err = -err;
561 dnet_log(n, DNET_LOG_ERROR, "Failed to create IO thread: %d\n", err);
562 goto err_out_io_threads;
566 return 0;
568 err_out_io_threads:
569 while (--i >= 0)
570 pthread_join(io->wio[i].tid, NULL);
572 i = io->net_thread_num;
573 err_out_net_destroy:
574 while (--i >= 0) {
575 pthread_join(io->net[i].tid, NULL);
576 close(io->net[i].epoll_fd);
579 pthread_mutex_destroy(&io->recv_lock);
580 err_out_recv_cond:
581 pthread_cond_destroy(&io->recv_wait);
582 err_out_free:
583 free(io);
584 err_out_exit:
585 return err;
588 void dnet_io_exit(struct dnet_node *n)
590 struct dnet_io *io = n->io;
591 struct dnet_io_req *r, *tmp;
592 int i;
594 n->need_exit = 1;
596 for (i=0; i<io->thread_num + io->nonblocking_thread_num; ++i)
597 pthread_join(io->wio[i].tid, NULL);
599 for (i=0; i<io->net_thread_num; ++i) {
600 pthread_join(io->net[i].tid, NULL);
601 close(io->net[i].epoll_fd);
604 dnet_io_cleanup_states(n);
606 list_for_each_entry_safe(r, tmp, &io->recv_list, req_entry) {
607 list_del(&r->req_entry);
608 dnet_io_req_free(r);
611 list_for_each_entry_safe(r, tmp, &io->nonblocking_recv_list, req_entry) {
612 list_del(&r->req_entry);
613 dnet_io_req_free(r);
616 pthread_mutex_destroy(&io->recv_lock);
617 pthread_cond_destroy(&io->recv_wait);
619 free(io);