Return correct reply size: data+binary, not data+event
[elliptics.git] / library / node.c
blobad9e53e02db6cf27da26d8979bc3616df2672028
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/stat.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <signal.h>
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static struct dnet_node *dnet_node_alloc(struct dnet_config *cfg)
29 struct dnet_node *n;
30 int err;
32 n = malloc(sizeof(struct dnet_node));
33 if (!n)
34 return NULL;
36 memset(n, 0, sizeof(struct dnet_node));
38 atomic_init(&n->trans, 0);
40 err = dnet_log_init(n, cfg->log);
41 if (err)
42 goto err_out_free;
44 err = pthread_mutex_init(&n->state_lock, NULL);
45 if (err) {
46 dnet_log_err(n, "Failed to initialize state lock: err: %d", err);
47 goto err_out_free;
50 n->wait = dnet_wait_alloc(0);
51 if (!n->wait) {
52 dnet_log(n, DNET_LOG_ERROR, "Failed to allocate wait structure.\n");
53 goto err_out_destroy_state;
56 err = dnet_counter_init(n);
57 if (err) {
58 dnet_log_err(n, "Failed to initialize statictics counters lock: err: %d", err);
59 goto err_out_destroy_wait;
62 err = pthread_mutex_init(&n->reconnect_lock, NULL);
63 if (err) {
64 err = -err;
65 dnet_log_err(n, "Failed to initialize reconnection lock: err: %d", err);
66 goto err_out_destroy_counter;
69 err = pthread_mutex_init(&n->group_lock, NULL);
70 if (err) {
71 err = -err;
72 dnet_log_err(n, "Failed to initialize group lock: err: %d", err);
73 goto err_out_destroy_reconnect_lock;
76 err = pthread_attr_init(&n->attr);
77 if (err) {
78 err = -err;
79 dnet_log_err(n, "Failed to initialize pthread attributes: err: %d", err);
80 goto err_out_destroy_group_lock;
82 pthread_attr_setdetachstate(&n->attr, PTHREAD_CREATE_DETACHED);
84 n->autodiscovery_socket = -1;
86 INIT_LIST_HEAD(&n->group_list);
87 INIT_LIST_HEAD(&n->empty_state_list);
88 INIT_LIST_HEAD(&n->storage_state_list);
89 INIT_LIST_HEAD(&n->reconnect_list);
91 INIT_LIST_HEAD(&n->check_entry);
93 memcpy(n->cookie, cfg->cookie, DNET_AUTH_COOKIE_SIZE);
95 return n;
97 err_out_destroy_group_lock:
98 pthread_mutex_destroy(&n->group_lock);
99 err_out_destroy_reconnect_lock:
100 pthread_mutex_destroy(&n->reconnect_lock);
101 err_out_destroy_counter:
102 dnet_counter_destroy(n);
103 err_out_destroy_wait:
104 dnet_wait_put(n->wait);
105 err_out_destroy_state:
106 pthread_mutex_destroy(&n->state_lock);
107 err_out_free:
108 free(n);
109 return NULL;
112 static struct dnet_group *dnet_group_create(unsigned int group_id)
114 struct dnet_group *g;
116 g = malloc(sizeof(struct dnet_group));
117 if (!g)
118 return NULL;
120 memset(g, 0, sizeof(struct dnet_group));
122 atomic_init(&g->refcnt, 1);
123 g->group_id = group_id;
125 INIT_LIST_HEAD(&g->state_list);
127 g->id_num = 0;
128 g->ids = NULL;
130 return g;
133 void dnet_group_destroy(struct dnet_group *g)
135 if (!list_empty(&g->state_list)) {
136 fprintf(stderr, "BUG in dnet_group_destroy, reference leak.\n");
137 exit(-1);
139 list_del(&g->group_entry);
140 free(g->ids);
141 free(g);
144 static struct dnet_group *dnet_group_search(struct dnet_node *n, unsigned int group_id)
146 struct dnet_group *g, *found = NULL;
148 list_for_each_entry(g, &n->group_list, group_entry) {
149 if (g->group_id == group_id) {
150 found = dnet_group_get(g);
151 break;
155 return found;
158 static int dnet_idc_compare(const void *k1, const void *k2)
160 const struct dnet_state_id *id1 = k1;
161 const struct dnet_state_id *id2 = k2;
163 return dnet_id_cmp_str(id1->raw.id, id2->raw.id);
166 static void dnet_idc_remove_ids(struct dnet_net_state *st, struct dnet_group *g)
168 int i, pos;
170 for (i=0, pos=0; i<g->id_num; ++i) {
171 if (g->ids[i].idc != st->idc) {
172 g->ids[pos] = g->ids[i];
173 pos++;
177 g->id_num = pos;
179 qsort(g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare);
180 st->idc = NULL;
183 int dnet_idc_create(struct dnet_net_state *st, int group_id, struct dnet_raw_id *ids, int id_num)
185 struct dnet_node *n = st->n;
186 struct dnet_idc *idc;
187 struct dnet_group *g;
188 int err = -ENOMEM, i, num;
189 struct timeval start, end;
190 long diff;
192 gettimeofday(&start, NULL);
194 idc = malloc(sizeof(struct dnet_idc) + sizeof(struct dnet_state_id) * id_num);
195 if (!idc)
196 goto err_out_exit;
198 memset(idc, 0, sizeof(struct dnet_idc));
200 for (i=0; i<id_num; ++i) {
201 struct dnet_state_id *sid = &idc->ids[i];
202 memcpy(&sid->raw, &ids[i], sizeof(struct dnet_raw_id));
203 sid->idc = idc;
206 pthread_mutex_lock(&n->state_lock);
208 g = dnet_group_search(n, group_id);
209 if (!g) {
210 g = dnet_group_create(group_id);
211 if (!g)
212 goto err_out_unlock;
214 list_add_tail(&g->group_entry, &n->group_list);
217 g->ids = realloc(g->ids, (g->id_num + id_num) * sizeof(struct dnet_state_id));
218 if (!g->ids) {
219 g->id_num = 0;
220 goto err_out_unlock_put;
223 num = 0;
224 for (i=0; i<id_num; ++i) {
225 if (!bsearch(&idc->ids[i], g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare)) {
226 memcpy(&g->ids[g->id_num + num], &idc->ids[i], sizeof(struct dnet_state_id));
227 num++;
231 if (!num) {
232 err = -EEXIST;
233 goto err_out_unlock_put;
236 g->id_num += num;
237 qsort(g->ids, g->id_num, sizeof(struct dnet_state_id), dnet_idc_compare);
239 list_add_tail(&st->state_entry, &g->state_list);
240 list_add_tail(&st->storage_state_entry, &n->storage_state_list);
242 idc->id_num = id_num;
243 idc->st = st;
244 idc->group = g;
246 st->idc = idc;
248 if (n->log->log_level > DNET_LOG_DEBUG) {
249 for (i=0; i<g->id_num; ++i) {
250 struct dnet_state_id *id = &g->ids[i];
251 dnet_log(n, DNET_LOG_DEBUG, "%d: %s -> %s\n", g->group_id,
252 dnet_dump_id_str(id->raw.id), dnet_state_dump_addr(id->idc->st));
256 err = dnet_setup_control_nolock(st);
257 if (err)
258 goto err_out_remove_nolock;
260 pthread_mutex_unlock(&n->state_lock);
262 gettimeofday(&end, NULL);
263 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
265 dnet_log(n, DNET_LOG_NOTICE, "Initialized group: %d, total ids: %d, added ids: %d out of %d: %ld usecs.\n",
266 g->group_id, g->id_num, num, id_num, diff);
268 if (n->server_prio) {
269 err = setsockopt(st->read_s, IPPROTO_IP, IP_TOS, &n->server_prio, 4);
270 if (err) {
271 err = -errno;
272 dnet_log_err(n, "could not set read server prio %d", n->server_prio);
274 err = setsockopt(st->write_s, IPPROTO_IP, IP_TOS, &n->server_prio, 4);
275 if (err) {
276 err = -errno;
277 dnet_log_err(n, "could not set write server prio %d", n->server_prio);
280 if (!err) {
281 dnet_log(n, DNET_LOG_INFO, "%s: server net TOS value set to %d\n",
282 dnet_server_convert_dnet_addr(&st->addr), n->server_prio);
286 return 0;
288 err_out_remove_nolock:
289 dnet_idc_remove_ids(st, g);
290 list_del_init(&st->state_entry);
291 list_del_init(&st->storage_state_entry);
292 err_out_unlock_put:
293 dnet_group_put(g);
294 err_out_unlock:
295 pthread_mutex_unlock(&n->state_lock);
296 free(idc);
297 err_out_exit:
298 gettimeofday(&end, NULL);
299 diff = (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec;
300 dnet_log(n, DNET_LOG_ERROR, "Failed to initialized group %d with %d ids: err: %d: %ld usecs.\n", group_id, id_num, err, diff);
301 return err;
304 void dnet_idc_destroy_nolock(struct dnet_net_state *st)
306 struct dnet_idc *idc;
307 struct dnet_group *g;
309 idc = st->idc;
310 if (!idc)
311 return;
313 g = idc->group;
314 dnet_idc_remove_ids(st, g);
315 dnet_group_put(g);
316 free(idc);
319 static int __dnet_idc_search(struct dnet_group *g, struct dnet_id *id)
321 int low, high, i, cmp;
322 struct dnet_state_id *sid;
324 for (low = -1, high = g->id_num; high-low > 1; ) {
325 i = low + (high - low)/2;
326 sid = &g->ids[i];
328 cmp = dnet_id_cmp_str(sid->raw.id, id->id);
329 if (cmp < 0)
330 low = i;
331 else if (cmp > 0)
332 high = i;
333 else
334 goto out;
336 i = high - 1;
338 out:
339 if (i == -1)
340 i = g->id_num - 1;
342 return i;
345 static struct dnet_state_id *dnet_idc_search(struct dnet_group *g, struct dnet_id *id)
347 return &g->ids[__dnet_idc_search(g, id)];
350 static int dnet_search_range_nolock(struct dnet_node *n, struct dnet_id *id, struct dnet_raw_id *start, struct dnet_raw_id *next)
352 struct dnet_state_id *sid;
353 struct dnet_group *group;
354 int idc_pos;
356 group = dnet_group_search(n, id->group_id);
357 if (!group)
358 return -ENOENT;
360 idc_pos = __dnet_idc_search(group, id);
361 sid = &group->ids[idc_pos];
362 memcpy(start, &sid->raw, sizeof(struct dnet_raw_id));
364 if (++idc_pos >= group->id_num)
365 idc_pos = 0;
366 sid = &group->ids[idc_pos];
367 memcpy(next, &sid->raw, sizeof(struct dnet_raw_id));
369 dnet_group_put(group);
371 return 0;
374 int dnet_search_range(struct dnet_node *n, struct dnet_id *id, struct dnet_raw_id *start, struct dnet_raw_id *next)
376 int err;
378 pthread_mutex_lock(&n->state_lock);
379 err = dnet_search_range_nolock(n, id, start, next);
380 pthread_mutex_unlock(&n->state_lock);
382 return err;
385 static struct dnet_state_id *__dnet_state_search_id(struct dnet_node *n, struct dnet_id *id)
387 struct dnet_state_id *sid;
388 struct dnet_group *group;
390 group = dnet_group_search(n, id->group_id);
391 if (!group)
392 return NULL;
394 sid = dnet_idc_search(group, id);
396 dnet_group_put(group);
398 return sid;
401 static struct dnet_net_state *__dnet_state_search(struct dnet_node *n, struct dnet_id *id)
403 struct dnet_state_id *sid = __dnet_state_search_id(n, id);
405 if (!sid)
406 return NULL;
408 return dnet_state_get(sid->idc->st);
411 struct dnet_net_state *dnet_state_search_by_addr(struct dnet_node *n, struct dnet_addr *addr)
413 struct dnet_net_state *st, *found = NULL;
414 struct dnet_group *g;
416 pthread_mutex_lock(&n->state_lock);
417 list_for_each_entry(g, &n->group_list, group_entry) {
418 list_for_each_entry(st, &g->state_list, state_entry) {
419 if (st->addr.addr_len == addr->addr_len &&
420 !memcmp(addr, &st->addr, st->addr.addr_len)) {
421 found = st;
422 break;
425 if (found) {
426 dnet_state_get(found);
427 break;
430 pthread_mutex_unlock(&n->state_lock);
432 return found;
435 struct dnet_net_state *dnet_state_search_nolock(struct dnet_node *n, struct dnet_id *id)
437 struct dnet_net_state *found;
439 found = __dnet_state_search(n, id);
440 if (!found) {
441 struct dnet_group *g;
443 g = dnet_group_search(n, id->group_id);
444 if (!g)
445 goto err_out_exit;
447 found = dnet_state_get(g->ids[0].idc->st);
448 dnet_group_put(g);
451 err_out_exit:
452 return found;
455 struct dnet_net_state *dnet_state_get_first(struct dnet_node *n, struct dnet_id *id)
457 struct dnet_net_state *found;
459 pthread_mutex_lock(&n->state_lock);
460 found = dnet_state_search_nolock(n, id);
461 if (found == n->st) {
462 dnet_state_put(found);
463 found = NULL;
466 pthread_mutex_unlock(&n->state_lock);
468 return found;
472 * We do not blindly return n->st, since it will go away eventually,
473 * since we want multiple states/listen sockets per single node
475 struct dnet_net_state *dnet_node_state(struct dnet_node *n)
477 struct dnet_net_state *found;
479 pthread_mutex_lock(&n->state_lock);
480 found = dnet_state_search_nolock(n, &n->id);
481 pthread_mutex_unlock(&n->state_lock);
483 return found;
486 struct dnet_node *dnet_node_create(struct dnet_config *cfg)
488 struct dnet_node *n;
489 int err = -ENOMEM;
490 sigset_t sig;
492 srand(time(NULL));
494 sigfillset(&sig);
495 pthread_sigmask(SIG_BLOCK, &sig, NULL);
496 sigprocmask(SIG_BLOCK, &sig, NULL);
498 if ((cfg->flags & DNET_CFG_JOIN_NETWORK) && (!cfg->cb)) {
499 err = -EINVAL;
500 if (cfg->log && cfg->log->log)
501 cfg->log->log(cfg->log->log_private, DNET_LOG_ERROR, "Joining node has to register "
502 "a command handler.\n");
503 goto err_out_exit;
507 * Client must have SINGLE io thread num, since only this can guarantee message order
508 * Messages are picked in dnet_io_process_pool() by different threads, and it is possible that completion
509 * callbacks will be executed out of order, which will badly break things.
511 if (!cfg->io_thread_num) {
512 cfg->io_thread_num = 1;
513 if (cfg->flags & DNET_CFG_JOIN_NETWORK)
514 cfg->io_thread_num = 20;
517 if (!cfg->nonblocking_io_thread_num) {
518 cfg->nonblocking_io_thread_num = 1;
520 if (cfg->flags & DNET_CFG_JOIN_NETWORK) {
521 if (cfg->io_thread_num > 100)
522 cfg->nonblocking_io_thread_num = 10;
526 if (!cfg->net_thread_num) {
527 cfg->net_thread_num = 1;
528 if (cfg->flags & DNET_CFG_JOIN_NETWORK)
529 cfg->net_thread_num = 8;
532 n = dnet_node_alloc(cfg);
533 if (!n) {
534 err = -ENOMEM;
535 goto err_out_exit;
538 if (!cfg->sock_type)
539 cfg->sock_type = SOCK_STREAM;
540 if (!cfg->proto)
541 cfg->proto = IPPROTO_TCP;
542 if (!cfg->family)
543 cfg->family = AF_INET;
545 if (!cfg->removal_delay)
546 cfg->removal_delay = 10; /* Store removed files 10 days by default */
548 if (!cfg->oplock_num)
549 cfg->oplock_num = 1024;
551 n->proto = cfg->proto;
552 n->sock_type = cfg->sock_type;
553 n->family = cfg->family;
554 n->wait_ts.tv_sec = cfg->wait_timeout;
556 n->cb = cfg->cb;
558 n->notify_hash_size = cfg->hash_size;
559 n->check_timeout = cfg->check_timeout;
560 n->stall_count = cfg->stall_count;
561 n->id.group_id = cfg->group_id;
562 n->bg_ionice_class = cfg->bg_ionice_class;
563 n->bg_ionice_prio = cfg->bg_ionice_prio;
564 n->removal_delay = cfg->removal_delay;
565 n->flags = cfg->flags;
566 n->cache_size = cfg->cache_size;
568 if (strlen(cfg->temp_meta_env))
569 n->temp_meta_env = cfg->temp_meta_env;
570 else
571 n->temp_meta_env = cfg->history_env;
573 if (!n->log)
574 dnet_log_init(n, cfg->log);
576 dnet_log(n, DNET_LOG_INFO, "Elliptics starts\n");
578 if (!n->wait_ts.tv_sec) {
579 n->wait_ts.tv_sec = 5;
580 dnet_log(n, DNET_LOG_NOTICE, "Using default wait timeout (%ld seconds).\n",
581 n->wait_ts.tv_sec);
584 if (!n->check_timeout) {
585 n->check_timeout = DNET_DEFAULT_CHECK_TIMEOUT_SEC;
586 dnet_log(n, DNET_LOG_NOTICE, "Using default check timeout (%ld seconds).\n",
587 n->check_timeout);
590 if (!n->stall_count) {
591 n->stall_count = DNET_DEFAULT_STALL_TRANSACTIONS;
592 dnet_log(n, DNET_LOG_NOTICE, "Using default stall count (%ld transactions).\n",
593 n->stall_count);
596 n->client_prio = cfg->client_prio;
597 n->server_prio = cfg->server_prio;
599 err = dnet_crypto_init(n, cfg->ns, cfg->nsize);
600 if (err)
601 goto err_out_free;
603 err = dnet_io_init(n, cfg);
604 if (err)
605 goto err_out_crypto_cleanup;
607 err = dnet_check_thread_start(n);
608 if (err)
609 goto err_out_io_exit;
611 dnet_log(n, DNET_LOG_DEBUG, "New node has been created at %s.\n", dnet_dump_node(n));
612 return n;
614 err_out_io_exit:
615 dnet_io_exit(n);
616 err_out_crypto_cleanup:
617 dnet_crypto_cleanup(n);
618 err_out_free:
619 free(n);
620 err_out_exit:
621 if (cfg->log && cfg->log->log)
622 cfg->log->log(cfg->log->log_private, DNET_LOG_ERROR, "Error during node creation.\n");
624 if (cfg->cb && cfg->cb->backend_cleanup)
625 cfg->cb->backend_cleanup(cfg->cb->command_private);
626 return NULL;
629 int dnet_need_exit(struct dnet_node *n)
631 return n->need_exit;
634 void dnet_set_need_exit(struct dnet_node *n)
636 n->need_exit = 1;
639 void dnet_node_cleanup_common_resources(struct dnet_node *n)
641 struct dnet_addr_storage *it, *atmp;
643 n->need_exit = 1;
644 dnet_check_thread_stop(n);
646 dnet_io_exit(n);
648 pthread_attr_destroy(&n->attr);
650 pthread_mutex_destroy(&n->state_lock);
651 dnet_crypto_cleanup(n);
653 list_for_each_entry_safe(it, atmp, &n->reconnect_list, reconnect_entry) {
654 list_del(&it->reconnect_entry);
655 free(it);
657 dnet_counter_destroy(n);
658 pthread_mutex_destroy(&n->reconnect_lock);
659 pthread_mutex_destroy(&n->group_lock);
661 dnet_wait_put(n->wait);
663 free(n->groups);
665 close(n->autodiscovery_socket);
668 void dnet_node_destroy(struct dnet_node *n)
670 dnet_log(n, DNET_LOG_DEBUG, "Destroying node at %s, st: %p.\n",
671 dnet_dump_node(n), n->st);
673 dnet_node_cleanup_common_resources(n);
675 free(n);
678 int dnet_node_set_groups(struct dnet_node *n, int *groups, int group_num)
680 int *g, i;
682 if (groups && !group_num)
683 return -EINVAL;
684 if (group_num && !groups)
685 return -EINVAL;
687 g = malloc(group_num * sizeof(int));
688 if (!g)
689 return -ENOMEM;
691 for (i=0; i<group_num; ++i)
692 g[i] = groups[i];
694 pthread_mutex_lock(&n->group_lock);
695 free(n->groups);
697 n->groups = g;
698 n->group_num = group_num;
699 pthread_mutex_unlock(&n->group_lock);
701 return 0;
704 void dnet_set_timeouts(struct dnet_node *n, int wait_timeout, int check_timeout)
706 n->wait_ts.tv_sec = wait_timeout;
707 n->check_timeout = check_timeout;