segfault/memleak on incorrect data fixed
[elliptics.git] / library / node.c
blob851a68486689a11cfb515b0c0b3e9521c2885976
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static struct dnet_node *dnet_node_alloc(struct dnet_config *cfg)
29 struct dnet_node *n;
30 int err;
32 n = malloc(sizeof(struct dnet_node));
33 if (!n)
34 return NULL;
36 memset(n, 0, sizeof(struct dnet_node));
38 atomic_init(&n->trans, 0);
40 err = dnet_log_init(n, cfg->log);
41 if (err)
42 goto err_out_free;
44 err = pthread_mutex_init(&n->state_lock, NULL);
45 if (err) {
46 dnet_log_err(n, "Failed to initialize state lock: err: %d", err);
47 goto err_out_free;
50 n->wait = dnet_wait_alloc(0);
51 if (!n->wait) {
52 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate wait structure.\n");
53 goto err_out_destroy_state;
56 err = dnet_counter_init(n);
57 if (err) {
58 dnet_log_err(n, "Failed to initialize statictics counters lock: err: %d", err);
59 goto err_out_destroy_wait;
62 err = pthread_mutex_init(&n->reconnect_lock, NULL);
63 if (err) {
64 err = -err;
65 dnet_log_err(n, "Failed to initialize reconnection lock: err: %d", err);
66 goto err_out_destroy_counter;
69 err = pthread_attr_init(&n->attr);
70 if (err) {
71 err = -err;
72 dnet_log_err(n, "Failed to initialize pthread attributes: err: %d", err);
73 goto err_out_destroy_reconnect_lock;
75 pthread_attr_setdetachstate(&n->attr, PTHREAD_CREATE_DETACHED);
77 n->autodiscovery_socket = -1;
79 INIT_LIST_HEAD(&n->group_list);
80 INIT_LIST_HEAD(&n->empty_state_list);
81 INIT_LIST_HEAD(&n->storage_state_list);
82 INIT_LIST_HEAD(&n->reconnect_list);
84 INIT_LIST_HEAD(&n->check_entry);
86 memcpy(n->cookie, cfg->cookie, DNET_AUTH_COOKIE_SIZE);
88 return n;
90 err_out_destroy_reconnect_lock:
91 pthread_mutex_destroy(&n->reconnect_lock);
92 err_out_destroy_counter:
93 dnet_counter_destroy(n);
94 err_out_destroy_wait:
95 dnet_wait_put(n->wait);
96 err_out_destroy_state:
97 pthread_mutex_destroy(&n->state_lock);
98 err_out_free:
99 free(n);
100 return NULL;
103 static struct dnet_group *dnet_group_create(unsigned int group_id)
105 struct dnet_group *g;
107 g = malloc(sizeof(struct dnet_group));
108 if (!g)
109 return NULL;
111 memset(g, 0, sizeof(struct dnet_group));
113 atomic_init(&g->refcnt, 1);
114 g->group_id = group_id;
116 INIT_LIST_HEAD(&g->state_list);
118 g->id_num = 0;
119 g->ids = NULL;
121 return g;
124 void dnet_group_destroy(struct dnet_group *g)
126 if (!list_empty(&g->state_list)) {
127 fprintf(stderr, "BUG in dnet_group_destroy, reference leak.\n");
128 exit(-1);
130 list_del(&g->group_entry);
131 free(g->ids);
132 free(g);
135 static struct dnet_group *dnet_group_search(struct dnet_node *n, unsigned int group_id)
137 struct dnet_group *g, *found = NULL;
139 list_for_each_entry(g, &n->group_list, group_entry) {
140 if (g->group_id == group_id) {
141 found = dnet_group_get(g);
142 break;
146 return found;
149 static int dnet_idc_compare(const void *k1, const void *k2)
151 const struct dnet_state_id *id1 = k1;
152 const struct dnet_state_id *id2 = k2;
154 return dnet_id_cmp_str(id1->raw.id, id2->raw.id);
157 static void dnet_idc_remove_ids(struct dnet_net_state *st, struct dnet_group *g)
159 int i, pos;
161 for (i=0, pos=0; i<g->id_num; ++i) {
162 if (g->ids[i].idc != st->idc) {
163 g->ids[pos] = g->ids[i];
164 pos++;
168 g->id_num = pos;
170 qsort(g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare);
171 st->idc = NULL;
174 int dnet_idc_create(struct dnet_net_state *st, int group_id, struct dnet_raw_id *ids, int id_num)
176 struct dnet_node *n = st->n;
177 struct dnet_idc *idc;
178 struct dnet_group *g;
179 int err = -ENOMEM, i, num;
180 struct timeval start, end;
181 long diff;
183 gettimeofday(&start, NULL);
185 idc = malloc(sizeof(struct dnet_idc) + sizeof(struct dnet_state_id) * id_num);
186 if (!idc)
187 goto err_out_exit;
189 memset(idc, 0, sizeof(struct dnet_idc));
191 for (i=0; i<id_num; ++i) {
192 struct dnet_state_id *sid = &idc->ids[i];
193 memcpy(&sid->raw, &ids[i], sizeof(struct dnet_raw_id));
194 sid->idc = idc;
197 pthread_mutex_lock(&n->state_lock);
199 g = dnet_group_search(n, group_id);
200 if (!g) {
201 g = dnet_group_create(group_id);
202 if (!g)
203 goto err_out_unlock;
205 list_add_tail(&g->group_entry, &n->group_list);
208 g->ids = realloc(g->ids, (g->id_num + id_num) * sizeof(struct dnet_state_id));
209 if (!g->ids) {
210 g->id_num = 0;
211 goto err_out_unlock_put;
214 num = 0;
215 for (i=0; i<id_num; ++i) {
216 if (!bsearch(&idc->ids[i], g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare)) {
217 memcpy(&g->ids[g->id_num + num], &idc->ids[i], sizeof(struct dnet_state_id));
218 num++;
222 if (!num) {
223 err = -EEXIST;
224 goto err_out_unlock_put;
227 g->id_num += num;
228 qsort(g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare);
230 list_add_tail(&st->state_entry, &g->state_list);
231 list_add_tail(&st->storage_state_entry, &n->storage_state_list);
233 idc->id_num = id_num;
234 idc->st = st;
235 idc->group = g;
237 st->idc = idc;
239 if (n->log->log_level > DNET_LOG_DEBUG) {
240 for (i=0; i<g->id_num; ++i) {
241 struct dnet_state_id *id = &g->ids[i];
242 dnet_log(n, DNET_LOG_DEBUG, "%d: %s -> %s\n", g->group_id,
243 dnet_dump_id_str(id->raw.id), dnet_state_dump_addr(id->idc->st));
247 err = dnet_setup_control_nolock(st);
248 if (err)
249 goto err_out_remove_nolock;
251 pthread_mutex_unlock(&n->state_lock);
253 gettimeofday(&end, NULL);
254 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
256 dnet_log(n, DNET_LOG_NOTICE, "Initialized group: %d, total ids: %d, added ids: %d out of %d: %ld usecs.\n",
257 g->group_id, g->id_num, num, id_num, diff);
259 if (n->server_prio) {
260 err = setsockopt(st->read_s, IPPROTO_IP, IP_TOS, &n->server_prio, 4);
261 if (err) {
262 err = -errno;
263 dnet_log_err(n, "could not set read server prio %d", n->server_prio);
265 err = setsockopt(st->write_s, IPPROTO_IP, IP_TOS, &n->server_prio, 4);
266 if (err) {
267 err = -errno;
268 dnet_log_err(n, "could not set write server prio %d", n->server_prio);
271 if (!err) {
272 dnet_log(n, DNET_LOG_INFO, "%s: server net TOS value set to %d\n",
273 dnet_server_convert_dnet_addr(&st->addr), n->server_prio);
277 return 0;
279 err_out_remove_nolock:
280 dnet_idc_remove_ids(st, g);
281 list_del_init(&st->state_entry);
282 list_del_init(&st->storage_state_entry);
283 err_out_unlock_put:
284 dnet_group_put(g);
285 err_out_unlock:
286 pthread_mutex_unlock(&n->state_lock);
287 free(idc);
288 err_out_exit:
289 gettimeofday(&end, NULL);
290 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
291 dnet_log(n, DNET_LOG_ERROR, "Failed to initialized group %d with %d ids: err: %d: %ld usecs.\n", group_id, id_num, err, diff);
292 return err;
295 void dnet_idc_destroy_nolock(struct dnet_net_state *st)
297 struct dnet_idc *idc;
298 struct dnet_group *g;
300 idc = st->idc;
301 if (!idc)
302 return;
304 g = idc->group;
305 dnet_idc_remove_ids(st, g);
306 dnet_group_put(g);
307 free(idc);
310 static int __dnet_idc_search(struct dnet_group *g, struct dnet_id *id)
312 int low, high, i, cmp;
313 struct dnet_state_id *sid;
315 for (low = -1, high = g->id_num; high-low > 1; ) {
316 i = low + (high - low)/2;
317 sid = &g->ids[i];
319 cmp = dnet_id_cmp_str(sid->raw.id, id->id);
320 if (cmp < 0)
321 low = i;
322 else if (cmp > 0)
323 high = i;
324 else
325 goto out;
327 i = high - 1;
329 out:
330 if (i == -1)
331 i = g->id_num - 1;
333 return i;
336 static struct dnet_state_id *dnet_idc_search(struct dnet_group *g, struct dnet_id *id)
338 return &g->ids[__dnet_idc_search(g, id)];
341 static int dnet_search_range_nolock(struct dnet_node *n, struct dnet_id *id, struct dnet_raw_id *start, struct dnet_raw_id *next)
343 struct dnet_state_id *sid;
344 struct dnet_group *group;
345 int idc_pos;
347 group = dnet_group_search(n, id->group_id);
348 if (!group)
349 return -ENOENT;
351 idc_pos = __dnet_idc_search(group, id);
352 sid = &group->ids[idc_pos];
353 memcpy(start, &sid->raw, sizeof(struct dnet_raw_id));
355 if (++idc_pos >= group->id_num)
356 idc_pos = 0;
357 sid = &group->ids[idc_pos];
358 memcpy(next, &sid->raw, sizeof(struct dnet_raw_id));
360 dnet_group_put(group);
362 return 0;
365 int dnet_search_range(struct dnet_node *n, struct dnet_id *id, struct dnet_raw_id *start, struct dnet_raw_id *next)
367 int err;
369 pthread_mutex_lock(&n->state_lock);
370 err = dnet_search_range_nolock(n, id, start, next);
371 pthread_mutex_unlock(&n->state_lock);
373 return err;
376 static struct dnet_state_id *__dnet_state_search_id(struct dnet_node *n, struct dnet_id *id)
378 struct dnet_state_id *sid;
379 struct dnet_group *group;
381 group = dnet_group_search(n, id->group_id);
382 if (!group)
383 return NULL;
385 sid = dnet_idc_search(group, id);
387 dnet_group_put(group);
389 return sid;
392 static struct dnet_net_state *__dnet_state_search(struct dnet_node *n, struct dnet_id *id)
394 struct dnet_state_id *sid = __dnet_state_search_id(n, id);
396 if (!sid)
397 return NULL;
399 return dnet_state_get(sid->idc->st);
402 struct dnet_net_state *dnet_state_search_by_addr(struct dnet_node *n, struct dnet_addr *addr)
404 struct dnet_net_state *st, *found = NULL;
405 struct dnet_group *g;
407 pthread_mutex_lock(&n->state_lock);
408 list_for_each_entry(g, &n->group_list, group_entry) {
409 list_for_each_entry(st, &g->state_list, state_entry) {
410 if (st->addr.addr_len == addr->addr_len &&
411 !memcmp(addr, &st->addr, st->addr.addr_len)) {
412 found = st;
413 break;
416 if (found) {
417 dnet_state_get(found);
418 break;
421 pthread_mutex_unlock(&n->state_lock);
423 return found;
426 struct dnet_net_state *dnet_state_search_nolock(struct dnet_node *n, struct dnet_id *id)
428 struct dnet_net_state *found;
430 found = __dnet_state_search(n, id);
431 if (!found) {
432 struct dnet_group *g;
434 g = dnet_group_search(n, id->group_id);
435 if (!g)
436 goto err_out_exit;
438 found = dnet_state_get(g->ids[0].idc->st);
439 dnet_group_put(g);
442 err_out_exit:
443 return found;
446 struct dnet_net_state *dnet_state_get_first(struct dnet_node *n, struct dnet_id *id)
448 struct dnet_net_state *found;
450 pthread_mutex_lock(&n->state_lock);
451 found = dnet_state_search_nolock(n, id);
452 if (found == n->st) {
453 dnet_state_put(found);
454 found = NULL;
457 pthread_mutex_unlock(&n->state_lock);
459 return found;
463 * We do not blindly return n->st, since it will go away eventually,
464 * since we want multiple states/listen sockets per single node
466 struct dnet_net_state *dnet_node_state(struct dnet_node *n)
468 struct dnet_net_state *found;
470 pthread_mutex_lock(&n->state_lock);
471 found = dnet_state_search_nolock(n, &n->id);
472 pthread_mutex_unlock(&n->state_lock);
474 return found;
477 struct dnet_node *dnet_node_create(struct dnet_config *cfg)
479 struct dnet_node *n;
480 int err = -ENOMEM;
482 srand(time(NULL));
484 if ((cfg->flags & DNET_CFG_JOIN_NETWORK) && (!cfg->cb)) {
485 err = -EINVAL;
486 if (cfg->log && cfg->log->log)
487 cfg->log->log(cfg->log->log_private, DNET_LOG_ERROR, "Joining node has to register "
488 "a command handler.\n");
489 goto err_out_exit;
493 * Client must have SINGLE io thread num, since only this can guarantee message order
494 * Messages are picked in dnet_io_process_pool() by different threads, and it is possible that completion
495 * callbacks will be executed out of order, which will badly break things.
497 if (!cfg->io_thread_num) {
498 cfg->io_thread_num = 1;
499 if (cfg->flags & DNET_CFG_JOIN_NETWORK)
500 cfg->io_thread_num = 20;
503 if (!cfg->nonblocking_io_thread_num) {
504 cfg->nonblocking_io_thread_num = 1;
506 if (cfg->flags & DNET_CFG_JOIN_NETWORK) {
507 if (cfg->io_thread_num > 100)
508 cfg->nonblocking_io_thread_num = 10;
512 if (!cfg->net_thread_num) {
513 cfg->net_thread_num = 1;
514 if (cfg->flags & DNET_CFG_JOIN_NETWORK)
515 cfg->net_thread_num = 8;
518 n = dnet_node_alloc(cfg);
519 if (!n) {
520 err = -ENOMEM;
521 goto err_out_exit;
524 if (!cfg->sock_type)
525 cfg->sock_type = SOCK_STREAM;
526 if (!cfg->proto)
527 cfg->proto = IPPROTO_TCP;
528 if (!cfg->family)
529 cfg->family = AF_INET;
531 if (!cfg->removal_delay)
532 cfg->removal_delay = 10; /* Store removed files 10 days by default */
534 if (!cfg->oplock_num)
535 cfg->oplock_num = 1024;
537 n->proto = cfg->proto;
538 n->sock_type = cfg->sock_type;
539 n->family = cfg->family;
540 n->wait_ts.tv_sec = cfg->wait_timeout;
542 n->cb = cfg->cb;
544 n->notify_hash_size = cfg->hash_size;
545 n->check_timeout = cfg->check_timeout;
546 n->stall_count = cfg->stall_count;
547 n->id.group_id = cfg->group_id;
548 n->bg_ionice_class = cfg->bg_ionice_class;
549 n->bg_ionice_prio = cfg->bg_ionice_prio;
550 n->removal_delay = cfg->removal_delay;
551 n->flags = cfg->flags;
552 n->cache_size = cfg->cache_size;
554 if (strlen(cfg->temp_meta_env))
555 n->temp_meta_env = cfg->temp_meta_env;
556 else
557 n->temp_meta_env = cfg->history_env;
559 if (!n->log)
560 dnet_log_init(n, cfg->log);
562 dnet_log(n, DNET_LOG_INFO, "Elliptics starts\n");
564 if (!n->wait_ts.tv_sec) {
565 n->wait_ts.tv_sec = 5;
566 dnet_log(n, DNET_LOG_NOTICE, "Using default wait timeout (%ld seconds).\n",
567 n->wait_ts.tv_sec);
570 if (!n->check_timeout) {
571 n->check_timeout = DNET_DEFAULT_CHECK_TIMEOUT_SEC;
572 dnet_log(n, DNET_LOG_NOTICE, "Using default check timeout (%ld seconds).\n",
573 n->check_timeout);
576 if (!n->stall_count) {
577 n->stall_count = DNET_DEFAULT_STALL_TRANSACTIONS;
578 dnet_log(n, DNET_LOG_NOTICE, "Using default stall count (%ld transactions).\n",
579 n->stall_count);
582 n->client_prio = cfg->client_prio;
583 n->server_prio = cfg->server_prio;
585 err = dnet_crypto_init(n, cfg->ns, cfg->nsize);
586 if (err)
587 goto err_out_free;
589 err = dnet_io_init(n, cfg);
590 if (err)
591 goto err_out_crypto_cleanup;
593 err = dnet_check_thread_start(n);
594 if (err)
595 goto err_out_io_exit;
597 dnet_log(n, DNET_LOG_DEBUG, "New node has been created at %s.\n", dnet_dump_node(n));
598 return n;
600 err_out_io_exit:
601 dnet_io_exit(n);
602 err_out_crypto_cleanup:
603 dnet_crypto_cleanup(n);
604 err_out_free:
605 free(n);
606 err_out_exit:
607 if (cfg->log && cfg->log->log)
608 cfg->log->log(cfg->log->log_private, DNET_LOG_ERROR, "Error during node creation.\n");
610 if (cfg->cb && cfg->cb->backend_cleanup)
611 cfg->cb->backend_cleanup(cfg->cb->command_private);
612 return NULL;
615 int dnet_need_exit(struct dnet_node *n)
617 return n->need_exit;
620 void dnet_set_need_exit(struct dnet_node *n)
622 n->need_exit = 1;
625 void dnet_node_cleanup_common_resources(struct dnet_node *n)
627 struct dnet_addr_storage *it, *atmp;
629 n->need_exit = 1;
630 dnet_check_thread_stop(n);
632 dnet_io_exit(n);
634 pthread_attr_destroy(&n->attr);
636 pthread_mutex_destroy(&n->state_lock);
637 dnet_crypto_cleanup(n);
639 list_for_each_entry_safe(it, atmp, &n->reconnect_list, reconnect_entry) {
640 list_del(&it->reconnect_entry);
641 free(it);
643 dnet_counter_destroy(n);
644 pthread_mutex_destroy(&n->reconnect_lock);
646 dnet_wait_put(n->wait);
648 close(n->autodiscovery_socket);
651 void dnet_node_destroy(struct dnet_node *n)
653 dnet_log(n, DNET_LOG_DEBUG, "Destroying node at %s, st: %p.\n",
654 dnet_dump_node(n), n->st);
656 dnet_node_cleanup_common_resources(n);
658 free(n);
661 struct dnet_session *dnet_session_create(struct dnet_node *n)
663 struct dnet_session *s;
665 s = (struct dnet_session *)malloc(sizeof(struct dnet_session));
666 if (!s)
667 return NULL;
669 s->node = n;
670 s->group_num = 0;
671 s->groups = NULL;
673 return s;
676 void dnet_session_destroy(struct dnet_session *s)
678 dnet_log(s->node, DNET_LOG_DEBUG, "Destroying session at %s, st: %p.\n",
679 dnet_dump_node(s->node), s->node->st);
680 free(s);
683 int dnet_session_set_groups(struct dnet_session *s, int *groups, int group_num)
685 int *g, i;
687 if (groups && !group_num)
688 return -EINVAL;
689 if (group_num && !groups)
690 return -EINVAL;
692 g = malloc(group_num * sizeof(int));
693 if (!g)
694 return -ENOMEM;
696 for (i=0; i<group_num; ++i)
697 g[i] = groups[i];
699 free(s->groups);
701 s->groups = g;
702 s->group_num = group_num;
704 return 0;
707 void dnet_set_timeouts(struct dnet_node *n, int wait_timeout, int check_timeout)
709 n->wait_ts.tv_sec = wait_timeout;
710 n->check_timeout = check_timeout;