Populate cache on READ command when DNET_IO_FLAGS_CACHE ioflag is set
[elliptics.git] / library / node.c
blob0cd97bd447b122d10a6054b11a874f40455317d0
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static struct dnet_node *dnet_node_alloc(struct dnet_config *cfg)
29 struct dnet_node *n;
30 int err;
32 n = malloc(sizeof(struct dnet_node));
33 if (!n)
34 return NULL;
36 memset(n, 0, sizeof(struct dnet_node));
38 atomic_init(&n->trans, 0);
40 err = dnet_log_init(n, cfg->log);
41 if (err)
42 goto err_out_free;
44 err = pthread_mutex_init(&n->state_lock, NULL);
45 if (err) {
46 dnet_log_err(n, "Failed to initialize state lock: err: %d", err);
47 goto err_out_free;
50 n->wait = dnet_wait_alloc(0);
51 if (!n->wait) {
52 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate wait structure.\n");
53 goto err_out_destroy_state;
56 err = dnet_counter_init(n);
57 if (err) {
58 dnet_log_err(n, "Failed to initialize statictics counters lock: err: %d", err);
59 goto err_out_destroy_wait;
62 err = pthread_mutex_init(&n->reconnect_lock, NULL);
63 if (err) {
64 err = -err;
65 dnet_log_err(n, "Failed to initialize reconnection lock: err: %d", err);
66 goto err_out_destroy_counter;
69 err = pthread_mutex_init(&n->group_lock, NULL);
70 if (err) {
71 err = -err;
72 dnet_log_err(n, "Failed to initialize group lock: err: %d", err);
73 goto err_out_destroy_reconnect_lock;
76 err = pthread_attr_init(&n->attr);
77 if (err) {
78 err = -err;
79 dnet_log_err(n, "Failed to initialize pthread attributes: err: %d", err);
80 goto err_out_destroy_group_lock;
82 pthread_attr_setdetachstate(&n->attr, PTHREAD_CREATE_DETACHED);
84 INIT_LIST_HEAD(&n->group_list);
85 INIT_LIST_HEAD(&n->empty_state_list);
86 INIT_LIST_HEAD(&n->storage_state_list);
87 INIT_LIST_HEAD(&n->reconnect_list);
89 INIT_LIST_HEAD(&n->check_entry);
91 memcpy(n->cookie, cfg->cookie, DNET_AUTH_COOKIE_SIZE);
93 return n;
95 err_out_destroy_group_lock:
96 pthread_mutex_destroy(&n->group_lock);
97 err_out_destroy_reconnect_lock:
98 pthread_mutex_destroy(&n->reconnect_lock);
99 err_out_destroy_counter:
100 dnet_counter_destroy(n);
101 err_out_destroy_wait:
102 dnet_wait_put(n->wait);
103 err_out_destroy_state:
104 pthread_mutex_destroy(&n->state_lock);
105 err_out_free:
106 free(n);
107 return NULL;
110 static struct dnet_group *dnet_group_create(unsigned int group_id)
112 struct dnet_group *g;
114 g = malloc(sizeof(struct dnet_group));
115 if (!g)
116 return NULL;
118 memset(g, 0, sizeof(struct dnet_group));
120 atomic_init(&g->refcnt, 1);
121 g->group_id = group_id;
123 INIT_LIST_HEAD(&g->state_list);
125 g->id_num = 0;
126 g->ids = NULL;
128 return g;
131 void dnet_group_destroy(struct dnet_group *g)
133 if (!list_empty(&g->state_list)) {
134 fprintf(stderr, "BUG in dnet_group_destroy, reference leak.\n");
135 exit(-1);
137 list_del(&g->group_entry);
138 free(g->ids);
139 free(g);
142 static struct dnet_group *dnet_group_search(struct dnet_node *n, unsigned int group_id)
144 struct dnet_group *g, *found = NULL;
146 list_for_each_entry(g, &n->group_list, group_entry) {
147 if (g->group_id == group_id) {
148 found = dnet_group_get(g);
149 break;
153 return found;
156 static int dnet_idc_compare(const void *k1, const void *k2)
158 const struct dnet_state_id *id1 = k1;
159 const struct dnet_state_id *id2 = k2;
161 return dnet_id_cmp_str(id1->raw.id, id2->raw.id);
164 static void dnet_idc_remove_ids(struct dnet_net_state *st, struct dnet_group *g)
166 int i, pos;
168 for (i=0, pos=0; i<g->id_num; ++i) {
169 if (g->ids[i].idc != st->idc) {
170 g->ids[pos] = g->ids[i];
171 pos++;
175 g->id_num = pos;
177 qsort(g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare);
178 st->idc = NULL;
181 int dnet_idc_create(struct dnet_net_state *st, int group_id, struct dnet_raw_id *ids, int id_num)
183 struct dnet_node *n = st->n;
184 struct dnet_idc *idc;
185 struct dnet_group *g;
186 int err = -ENOMEM, i, num;
187 struct timeval start, end;
188 long diff;
190 gettimeofday(&start, NULL);
192 idc = malloc(sizeof(struct dnet_idc) + sizeof(struct dnet_state_id) * id_num);
193 if (!idc)
194 goto err_out_exit;
196 memset(idc, 0, sizeof(struct dnet_idc));
198 for (i=0; i<id_num; ++i) {
199 struct dnet_state_id *sid = &idc->ids[i];
200 memcpy(&sid->raw, &ids[i], sizeof(struct dnet_raw_id));
201 sid->idc = idc;
204 pthread_mutex_lock(&n->state_lock);
206 g = dnet_group_search(n, group_id);
207 if (!g) {
208 g = dnet_group_create(group_id);
209 if (!g)
210 goto err_out_unlock;
212 list_add_tail(&g->group_entry, &n->group_list);
215 g->ids = realloc(g->ids, (g->id_num + id_num) * sizeof(struct dnet_state_id));
216 if (!g->ids) {
217 g->id_num = 0;
218 goto err_out_unlock_put;
221 num = 0;
222 for (i=0; i<id_num; ++i) {
223 if (!bsearch(&idc->ids[i], g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare)) {
224 memcpy(&g->ids[g->id_num + num], &idc->ids[i], sizeof(struct dnet_state_id));
225 num++;
229 if (!num) {
230 err = -EEXIST;
231 goto err_out_unlock_put;
234 g->id_num += num;
235 qsort(g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare);
237 list_add_tail(&st->state_entry, &g->state_list);
238 list_add_tail(&st->storage_state_entry, &n->storage_state_list);
240 idc->id_num = id_num;
241 idc->st = st;
242 idc->group = g;
244 st->idc = idc;
246 if (n->log->log_level > DNET_LOG_DEBUG) {
247 for (i=0; i<g->id_num; ++i) {
248 struct dnet_state_id *id = &g->ids[i];
249 dnet_log(n, DNET_LOG_DEBUG, "%d: %s -> %s\n", g->group_id,
250 dnet_dump_id_str(id->raw.id), dnet_state_dump_addr(id->idc->st));
254 err = dnet_setup_control_nolock(st);
255 if (err)
256 goto err_out_remove_nolock;
258 pthread_mutex_unlock(&n->state_lock);
260 gettimeofday(&end, NULL);
261 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
263 dnet_log(n, DNET_LOG_NOTICE, "Initialized group: %d, total ids: %d, added ids: %d out of %d: %ld usecs.\n",
264 g->group_id, g->id_num, num, id_num, diff);
266 if (n->server_prio) {
267 err = setsockopt(st->read_s, IPPROTO_IP, IP_TOS, &n->server_prio, 4);
268 if (err) {
269 err = -errno;
270 dnet_log_err(n, "could not set read server prio %d", n->server_prio);
272 err = setsockopt(st->write_s, IPPROTO_IP, IP_TOS, &n->server_prio, 4);
273 if (err) {
274 err = -errno;
275 dnet_log_err(n, "could not set write server prio %d", n->server_prio);
278 if (!err) {
279 dnet_log(n, DNET_LOG_INFO, "%s: server net TOS value set to %d\n",
280 dnet_server_convert_dnet_addr(&st->addr), n->server_prio);
284 return 0;
286 err_out_remove_nolock:
287 dnet_idc_remove_ids(st, g);
288 list_del_init(&st->state_entry);
289 list_del_init(&st->storage_state_entry);
290 err_out_unlock_put:
291 dnet_group_put(g);
292 err_out_unlock:
293 pthread_mutex_unlock(&n->state_lock);
294 free(idc);
295 err_out_exit:
296 gettimeofday(&end, NULL);
297 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
298 dnet_log(n, DNET_LOG_ERROR, "Failed to initialized group %d with %d ids: err: %d: %ld usecs.\n", group_id, id_num, err, diff);
299 return err;
302 void dnet_idc_destroy_nolock(struct dnet_net_state *st)
304 struct dnet_idc *idc;
305 struct dnet_group *g;
307 idc = st->idc;
308 if (!idc)
309 return;
311 g = idc->group;
312 dnet_idc_remove_ids(st, g);
313 dnet_group_put(g);
314 free(idc);
317 static int __dnet_idc_search(struct dnet_group *g, struct dnet_id *id)
319 int low, high, i, cmp;
320 struct dnet_state_id *sid;
322 for (low = -1, high = g->id_num; high-low > 1; ) {
323 i = low + (high - low)/2;
324 sid = &g->ids[i];
326 cmp = dnet_id_cmp_str(sid->raw.id, id->id);
327 if (cmp < 0)
328 low = i;
329 else if (cmp > 0)
330 high = i;
331 else
332 goto out;
334 i = high - 1;
336 out:
337 if (i == -1)
338 i = g->id_num - 1;
340 return i;
343 static struct dnet_state_id *dnet_idc_search(struct dnet_group *g, struct dnet_id *id)
345 return &g->ids[__dnet_idc_search(g, id)];
348 static int dnet_search_range_nolock(struct dnet_node *n, struct dnet_id *id, struct dnet_raw_id *start, struct dnet_raw_id *next)
350 struct dnet_state_id *sid;
351 struct dnet_group *group;
352 int idc_pos;
354 group = dnet_group_search(n, id->group_id);
355 if (!group)
356 return -ENOENT;
358 idc_pos = __dnet_idc_search(group, id);
359 sid = &group->ids[idc_pos];
360 memcpy(start, &sid->raw, sizeof(struct dnet_raw_id));
362 if (++idc_pos >= group->id_num)
363 idc_pos = 0;
364 sid = &group->ids[idc_pos];
365 memcpy(next, &sid->raw, sizeof(struct dnet_raw_id));
367 dnet_group_put(group);
369 return 0;
372 int dnet_search_range(struct dnet_node *n, struct dnet_id *id, struct dnet_raw_id *start, struct dnet_raw_id *next)
374 int err;
376 pthread_mutex_lock(&n->state_lock);
377 err = dnet_search_range_nolock(n, id, start, next);
378 pthread_mutex_unlock(&n->state_lock);
380 return err;
383 static struct dnet_state_id *__dnet_state_search_id(struct dnet_node *n, struct dnet_id *id)
385 struct dnet_state_id *sid;
386 struct dnet_group *group;
388 group = dnet_group_search(n, id->group_id);
389 if (!group)
390 return NULL;
392 sid = dnet_idc_search(group, id);
394 dnet_group_put(group);
396 return sid;
399 static struct dnet_net_state *__dnet_state_search(struct dnet_node *n, struct dnet_id *id)
401 struct dnet_state_id *sid = __dnet_state_search_id(n, id);
403 if (!sid)
404 return NULL;
406 return dnet_state_get(sid->idc->st);
409 struct dnet_net_state *dnet_state_search_by_addr(struct dnet_node *n, struct dnet_addr *addr)
411 struct dnet_net_state *st, *found = NULL;
412 struct dnet_group *g;
414 pthread_mutex_lock(&n->state_lock);
415 list_for_each_entry(g, &n->group_list, group_entry) {
416 list_for_each_entry(st, &g->state_list, state_entry) {
417 if (st->addr.addr_len == addr->addr_len &&
418 !memcmp(addr, &st->addr, st->addr.addr_len)) {
419 found = st;
420 break;
423 if (found) {
424 dnet_state_get(found);
425 break;
428 pthread_mutex_unlock(&n->state_lock);
430 return found;
433 struct dnet_net_state *dnet_state_search_nolock(struct dnet_node *n, struct dnet_id *id)
435 struct dnet_net_state *found;
437 found = __dnet_state_search(n, id);
438 if (!found) {
439 struct dnet_group *g;
441 g = dnet_group_search(n, id->group_id);
442 if (!g)
443 goto err_out_exit;
445 found = dnet_state_get(g->ids[0].idc->st);
446 dnet_group_put(g);
449 err_out_exit:
450 return found;
453 struct dnet_net_state *dnet_state_get_first(struct dnet_node *n, struct dnet_id *id)
455 struct dnet_net_state *found;
457 pthread_mutex_lock(&n->state_lock);
458 found = dnet_state_search_nolock(n, id);
459 if (found == n->st) {
460 dnet_state_put(found);
461 found = NULL;
464 pthread_mutex_unlock(&n->state_lock);
466 return found;
470 * We do not blindly return n->st, since it will go away eventually,
471 * since we want multiple states/listen sockets per single node
473 struct dnet_net_state *dnet_node_state(struct dnet_node *n)
475 struct dnet_net_state *found;
477 pthread_mutex_lock(&n->state_lock);
478 found = dnet_state_search_nolock(n, &n->id);
479 pthread_mutex_unlock(&n->state_lock);
481 return found;
484 struct dnet_node *dnet_node_create(struct dnet_config *cfg)
486 struct dnet_node *n;
487 int err = -ENOMEM;
488 sigset_t sig;
490 srand(time(NULL));
492 sigfillset(&sig);
493 pthread_sigmask(SIG_BLOCK, &sig, NULL);
494 sigprocmask(SIG_BLOCK, &sig, NULL);
496 if ((cfg->flags & DNET_CFG_JOIN_NETWORK) && (!cfg->cb)) {
497 err = -EINVAL;
498 if (cfg->log && cfg->log->log)
499 cfg->log->log(cfg->log->log_private, DNET_LOG_ERROR, "Joining node has to register "
500 "a command handler.\n");
501 goto err_out_exit;
505 * Client must have SINGLE io thread num, since only this can guarantee message order
506 * Messages are picked in dnet_io_process_pool() by different threads, and it is possible that completion
507 * callbacks will be executed out of order, which will badly break things.
509 if (!cfg->io_thread_num) {
510 cfg->io_thread_num = 1;
511 if (cfg->flags & DNET_CFG_JOIN_NETWORK)
512 cfg->io_thread_num = 20;
515 if (!cfg->nonblocking_io_thread_num) {
516 cfg->nonblocking_io_thread_num = 1;
518 if (cfg->flags & DNET_CFG_JOIN_NETWORK) {
519 if (cfg->io_thread_num > 100)
520 cfg->nonblocking_io_thread_num = 10;
524 if (!cfg->net_thread_num) {
525 cfg->net_thread_num = 1;
526 if (cfg->flags & DNET_CFG_JOIN_NETWORK)
527 cfg->net_thread_num = 8;
530 n = dnet_node_alloc(cfg);
531 if (!n) {
532 err = -ENOMEM;
533 goto err_out_exit;
536 if (!cfg->sock_type)
537 cfg->sock_type = SOCK_STREAM;
538 if (!cfg->proto)
539 cfg->proto = IPPROTO_TCP;
540 if (!cfg->family)
541 cfg->family = AF_INET;
543 if (!cfg->removal_delay)
544 cfg->removal_delay = 10; /* Store removed files 10 days by default */
546 if (!cfg->oplock_num)
547 cfg->oplock_num = 1024;
549 n->proto = cfg->proto;
550 n->sock_type = cfg->sock_type;
551 n->family = cfg->family;
552 n->wait_ts.tv_sec = cfg->wait_timeout;
554 n->cb = cfg->cb;
556 n->notify_hash_size = cfg->hash_size;
557 n->check_timeout = cfg->check_timeout;
558 n->stall_count = cfg->stall_count;
559 n->id.group_id = cfg->group_id;
560 n->bg_ionice_class = cfg->bg_ionice_class;
561 n->bg_ionice_prio = cfg->bg_ionice_prio;
562 n->removal_delay = cfg->removal_delay;
563 n->flags = cfg->flags;
564 n->cache_size = cfg->cache_size;
566 if (strlen(cfg->temp_meta_env))
567 n->temp_meta_env = cfg->temp_meta_env;
568 else
569 n->temp_meta_env = cfg->history_env;
571 if (!n->log)
572 dnet_log_init(n, cfg->log);
574 dnet_log(n, DNET_LOG_INFO, "Elliptics starts\n");
576 if (!n->wait_ts.tv_sec) {
577 n->wait_ts.tv_sec = 5;
578 dnet_log(n, DNET_LOG_NOTICE, "Using default wait timeout (%ld seconds).\n",
579 n->wait_ts.tv_sec);
582 if (!n->check_timeout) {
583 n->check_timeout = DNET_DEFAULT_CHECK_TIMEOUT_SEC;
584 dnet_log(n, DNET_LOG_NOTICE, "Using default check timeout (%ld seconds).\n",
585 n->check_timeout);
588 if (!n->stall_count) {
589 n->stall_count = DNET_DEFAULT_STALL_TRANSACTIONS;
590 dnet_log(n, DNET_LOG_NOTICE, "Using default stall count (%ld transactions).\n",
591 n->stall_count);
594 n->client_prio = cfg->client_prio;
595 n->server_prio = cfg->server_prio;
597 err = dnet_crypto_init(n, cfg->ns, cfg->nsize);
598 if (err)
599 goto err_out_free;
601 err = dnet_io_init(n, cfg);
602 if (err)
603 goto err_out_crypto_cleanup;
605 err = dnet_check_thread_start(n);
606 if (err)
607 goto err_out_io_exit;
609 dnet_log(n, DNET_LOG_DEBUG, "New node has been created at %s.\n",
610 dnet_dump_node(n));
611 return n;
613 err_out_io_exit:
614 dnet_io_exit(n);
615 err_out_crypto_cleanup:
616 dnet_crypto_cleanup(n);
617 err_out_free:
618 free(n);
619 err_out_exit:
620 if (cfg->log && cfg->log->log)
621 cfg->log->log(cfg->log->log_private, DNET_LOG_ERROR, "Error during node creation.\n");
623 if (cfg->cb && cfg->cb->backend_cleanup)
624 cfg->cb->backend_cleanup(cfg->cb->command_private);
625 return NULL;
628 int dnet_need_exit(struct dnet_node *n)
630 return n->need_exit;
633 void dnet_set_need_exit(struct dnet_node *n)
635 n->need_exit = 1;
638 void dnet_node_cleanup_common_resources(struct dnet_node *n)
640 struct dnet_addr_storage *it, *atmp;
642 n->need_exit = 1;
643 dnet_check_thread_stop(n);
645 dnet_io_exit(n);
647 pthread_attr_destroy(&n->attr);
649 pthread_mutex_destroy(&n->state_lock);
650 dnet_crypto_cleanup(n);
652 list_for_each_entry_safe(it, atmp, &n->reconnect_list, reconnect_entry) {
653 list_del(&it->reconnect_entry);
654 free(it);
656 dnet_counter_destroy(n);
657 pthread_mutex_destroy(&n->reconnect_lock);
658 pthread_mutex_destroy(&n->group_lock);
660 dnet_wait_put(n->wait);
662 free(n->groups);
665 void dnet_node_destroy(struct dnet_node *n)
667 dnet_log(n, DNET_LOG_DEBUG, "Destroying node at %s, st: %p.\n",
668 dnet_dump_node(n), n->st);
670 dnet_node_cleanup_common_resources(n);
672 free(n);
675 int dnet_node_set_groups(struct dnet_node *n, int *groups, int group_num)
677 int *g, i;
679 if (groups && !group_num)
680 return -EINVAL;
681 if (group_num && !groups)
682 return -EINVAL;
684 g = malloc(group_num * sizeof(int));
685 if (!g)
686 return -ENOMEM;
688 for (i=0; i<group_num; ++i)
689 g[i] = groups[i];
691 pthread_mutex_lock(&n->group_lock);
692 free(n->groups);
694 n->groups = g;
695 n->group_num = group_num;
696 pthread_mutex_unlock(&n->group_lock);
698 return 0;
701 void dnet_set_timeouts(struct dnet_node *n, int wait_timeout, int check_timeout)
703 n->wait_ts.tv_sec = wait_timeout;
704 n->check_timeout = check_timeout;