2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static struct dnet_node
*dnet_node_alloc(struct dnet_config
*cfg
)
32 n
= malloc(sizeof(struct dnet_node
));
36 memset(n
, 0, sizeof(struct dnet_node
));
38 atomic_init(&n
->trans
, 0);
40 err
= dnet_log_init(n
, cfg
->log
);
44 err
= pthread_mutex_init(&n
->state_lock
, NULL
);
46 dnet_log_err(n
, "Failed to initialize state lock: err: %d", err
);
50 n
->wait
= dnet_wait_alloc(0);
52 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate wait structure.\n");
53 goto err_out_destroy_state
;
56 err
= dnet_counter_init(n
);
58 dnet_log_err(n
, "Failed to initialize statictics counters lock: err: %d", err
);
59 goto err_out_destroy_wait
;
62 err
= pthread_mutex_init(&n
->reconnect_lock
, NULL
);
65 dnet_log_err(n
, "Failed to initialize reconnection lock: err: %d", err
);
66 goto err_out_destroy_counter
;
69 err
= pthread_mutex_init(&n
->group_lock
, NULL
);
72 dnet_log_err(n
, "Failed to initialize group lock: err: %d", err
);
73 goto err_out_destroy_reconnect_lock
;
76 err
= pthread_attr_init(&n
->attr
);
79 dnet_log_err(n
, "Failed to initialize pthread attributes: err: %d", err
);
80 goto err_out_destroy_group_lock
;
82 pthread_attr_setdetachstate(&n
->attr
, PTHREAD_CREATE_DETACHED
);
84 INIT_LIST_HEAD(&n
->group_list
);
85 INIT_LIST_HEAD(&n
->empty_state_list
);
86 INIT_LIST_HEAD(&n
->storage_state_list
);
87 INIT_LIST_HEAD(&n
->reconnect_list
);
89 INIT_LIST_HEAD(&n
->check_entry
);
91 memcpy(n
->cookie
, cfg
->cookie
, DNET_AUTH_COOKIE_SIZE
);
95 err_out_destroy_group_lock
:
96 pthread_mutex_destroy(&n
->group_lock
);
97 err_out_destroy_reconnect_lock
:
98 pthread_mutex_destroy(&n
->reconnect_lock
);
99 err_out_destroy_counter
:
100 dnet_counter_destroy(n
);
101 err_out_destroy_wait
:
102 dnet_wait_put(n
->wait
);
103 err_out_destroy_state
:
104 pthread_mutex_destroy(&n
->state_lock
);
110 static struct dnet_group
*dnet_group_create(unsigned int group_id
)
112 struct dnet_group
*g
;
114 g
= malloc(sizeof(struct dnet_group
));
118 memset(g
, 0, sizeof(struct dnet_group
));
120 atomic_init(&g
->refcnt
, 1);
121 g
->group_id
= group_id
;
123 INIT_LIST_HEAD(&g
->state_list
);
131 void dnet_group_destroy(struct dnet_group
*g
)
133 if (!list_empty(&g
->state_list
)) {
134 fprintf(stderr
, "BUG in dnet_group_destroy, reference leak.\n");
137 list_del(&g
->group_entry
);
142 static struct dnet_group
*dnet_group_search(struct dnet_node
*n
, unsigned int group_id
)
144 struct dnet_group
*g
, *found
= NULL
;
146 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
147 if (g
->group_id
== group_id
) {
148 found
= dnet_group_get(g
);
156 static int dnet_idc_compare(const void *k1
, const void *k2
)
158 const struct dnet_state_id
*id1
= k1
;
159 const struct dnet_state_id
*id2
= k2
;
161 return dnet_id_cmp_str(id1
->raw
.id
, id2
->raw
.id
);
164 static void dnet_idc_remove_ids(struct dnet_net_state
*st
, struct dnet_group
*g
)
168 for (i
=0, pos
=0; i
<g
->id_num
; ++i
) {
169 if (g
->ids
[i
].idc
!= st
->idc
) {
170 g
->ids
[pos
] = g
->ids
[i
];
177 qsort(g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
);
181 int dnet_idc_create(struct dnet_net_state
*st
, int group_id
, struct dnet_raw_id
*ids
, int id_num
)
183 struct dnet_node
*n
= st
->n
;
184 struct dnet_idc
*idc
;
185 struct dnet_group
*g
;
186 int err
= -ENOMEM
, i
, num
;
187 struct timeval start
, end
;
190 gettimeofday(&start
, NULL
);
192 idc
= malloc(sizeof(struct dnet_idc
) + sizeof(struct dnet_state_id
) * id_num
);
196 memset(idc
, 0, sizeof(struct dnet_idc
));
198 for (i
=0; i
<id_num
; ++i
) {
199 struct dnet_state_id
*sid
= &idc
->ids
[i
];
200 memcpy(&sid
->raw
, &ids
[i
], sizeof(struct dnet_raw_id
));
204 pthread_mutex_lock(&n
->state_lock
);
206 g
= dnet_group_search(n
, group_id
);
208 g
= dnet_group_create(group_id
);
212 list_add_tail(&g
->group_entry
, &n
->group_list
);
215 g
->ids
= realloc(g
->ids
, (g
->id_num
+ id_num
) * sizeof(struct dnet_state_id
));
218 goto err_out_unlock_put
;
222 for (i
=0; i
<id_num
; ++i
) {
223 if (!bsearch(&idc
->ids
[i
], g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
)) {
224 memcpy(&g
->ids
[g
->id_num
+ num
], &idc
->ids
[i
], sizeof(struct dnet_state_id
));
231 goto err_out_unlock_put
;
235 qsort(g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
);
237 list_add_tail(&st
->state_entry
, &g
->state_list
);
238 list_add_tail(&st
->storage_state_entry
, &n
->storage_state_list
);
240 idc
->id_num
= id_num
;
246 if (n
->log
->log_level
> DNET_LOG_DEBUG
) {
247 for (i
=0; i
<g
->id_num
; ++i
) {
248 struct dnet_state_id
*id
= &g
->ids
[i
];
249 dnet_log(n
, DNET_LOG_DEBUG
, "%d: %s -> %s\n", g
->group_id
,
250 dnet_dump_id_str(id
->raw
.id
), dnet_state_dump_addr(id
->idc
->st
));
254 err
= dnet_setup_control_nolock(st
);
256 goto err_out_remove_nolock
;
258 pthread_mutex_unlock(&n
->state_lock
);
260 gettimeofday(&end
, NULL
);
261 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
263 dnet_log(n
, DNET_LOG_NOTICE
, "Initialized group: %d, total ids: %d, added ids: %d out of %d: %ld usecs.\n",
264 g
->group_id
, g
->id_num
, num
, id_num
, diff
);
266 if (n
->server_prio
) {
267 err
= setsockopt(st
->read_s
, IPPROTO_IP
, IP_TOS
, &n
->server_prio
, 4);
270 dnet_log_err(n
, "could not set read server prio %d", n
->server_prio
);
272 err
= setsockopt(st
->write_s
, IPPROTO_IP
, IP_TOS
, &n
->server_prio
, 4);
275 dnet_log_err(n
, "could not set write server prio %d", n
->server_prio
);
279 dnet_log(n
, DNET_LOG_INFO
, "%s: server net TOS value set to %d\n",
280 dnet_server_convert_dnet_addr(&st
->addr
), n
->server_prio
);
286 err_out_remove_nolock
:
287 dnet_idc_remove_ids(st
, g
);
288 list_del_init(&st
->state_entry
);
289 list_del_init(&st
->storage_state_entry
);
293 pthread_mutex_unlock(&n
->state_lock
);
296 gettimeofday(&end
, NULL
);
297 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
298 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialized group %d with %d ids: err: %d: %ld usecs.\n", group_id
, id_num
, err
, diff
);
302 void dnet_idc_destroy_nolock(struct dnet_net_state
*st
)
304 struct dnet_idc
*idc
;
305 struct dnet_group
*g
;
312 dnet_idc_remove_ids(st
, g
);
317 static int __dnet_idc_search(struct dnet_group
*g
, struct dnet_id
*id
)
319 int low
, high
, i
, cmp
;
320 struct dnet_state_id
*sid
;
322 for (low
= -1, high
= g
->id_num
; high
-low
> 1; ) {
323 i
= low
+ (high
- low
)/2;
326 cmp
= dnet_id_cmp_str(sid
->raw
.id
, id
->id
);
343 static struct dnet_state_id
*dnet_idc_search(struct dnet_group
*g
, struct dnet_id
*id
)
345 return &g
->ids
[__dnet_idc_search(g
, id
)];
348 static int dnet_search_range_nolock(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_raw_id
*start
, struct dnet_raw_id
*next
)
350 struct dnet_state_id
*sid
;
351 struct dnet_group
*group
;
354 group
= dnet_group_search(n
, id
->group_id
);
358 idc_pos
= __dnet_idc_search(group
, id
);
359 sid
= &group
->ids
[idc_pos
];
360 memcpy(start
, &sid
->raw
, sizeof(struct dnet_raw_id
));
362 if (++idc_pos
>= group
->id_num
)
364 sid
= &group
->ids
[idc_pos
];
365 memcpy(next
, &sid
->raw
, sizeof(struct dnet_raw_id
));
367 dnet_group_put(group
);
372 int dnet_search_range(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_raw_id
*start
, struct dnet_raw_id
*next
)
376 pthread_mutex_lock(&n
->state_lock
);
377 err
= dnet_search_range_nolock(n
, id
, start
, next
);
378 pthread_mutex_unlock(&n
->state_lock
);
383 static struct dnet_state_id
*__dnet_state_search_id(struct dnet_node
*n
, struct dnet_id
*id
)
385 struct dnet_state_id
*sid
;
386 struct dnet_group
*group
;
388 group
= dnet_group_search(n
, id
->group_id
);
392 sid
= dnet_idc_search(group
, id
);
394 dnet_group_put(group
);
399 static struct dnet_net_state
*__dnet_state_search(struct dnet_node
*n
, struct dnet_id
*id
)
401 struct dnet_state_id
*sid
= __dnet_state_search_id(n
, id
);
406 return dnet_state_get(sid
->idc
->st
);
409 struct dnet_net_state
*dnet_state_search_by_addr(struct dnet_node
*n
, struct dnet_addr
*addr
)
411 struct dnet_net_state
*st
, *found
= NULL
;
412 struct dnet_group
*g
;
414 pthread_mutex_lock(&n
->state_lock
);
415 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
416 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
417 if (st
->addr
.addr_len
== addr
->addr_len
&&
418 !memcmp(addr
, &st
->addr
, st
->addr
.addr_len
)) {
424 dnet_state_get(found
);
428 pthread_mutex_unlock(&n
->state_lock
);
433 struct dnet_net_state
*dnet_state_search_nolock(struct dnet_node
*n
, struct dnet_id
*id
)
435 struct dnet_net_state
*found
;
437 found
= __dnet_state_search(n
, id
);
439 struct dnet_group
*g
;
441 g
= dnet_group_search(n
, id
->group_id
);
445 found
= dnet_state_get(g
->ids
[0].idc
->st
);
453 struct dnet_net_state
*dnet_state_get_first(struct dnet_node
*n
, struct dnet_id
*id
)
455 struct dnet_net_state
*found
;
457 pthread_mutex_lock(&n
->state_lock
);
458 found
= dnet_state_search_nolock(n
, id
);
459 if (found
== n
->st
) {
460 dnet_state_put(found
);
464 pthread_mutex_unlock(&n
->state_lock
);
470 * We do not blindly return n->st, since it will go away eventually,
471 * since we want multiple states/listen sockets per single node
473 struct dnet_net_state
*dnet_node_state(struct dnet_node
*n
)
475 struct dnet_net_state
*found
;
477 pthread_mutex_lock(&n
->state_lock
);
478 found
= dnet_state_search_nolock(n
, &n
->id
);
479 pthread_mutex_unlock(&n
->state_lock
);
484 struct dnet_node
*dnet_node_create(struct dnet_config
*cfg
)
493 pthread_sigmask(SIG_BLOCK
, &sig
, NULL
);
494 sigprocmask(SIG_BLOCK
, &sig
, NULL
);
496 if ((cfg
->flags
& DNET_CFG_JOIN_NETWORK
) && (!cfg
->cb
)) {
498 if (cfg
->log
&& cfg
->log
->log
)
499 cfg
->log
->log(cfg
->log
->log_private
, DNET_LOG_ERROR
, "Joining node has to register "
500 "a command handler.\n");
505 * Client must have SINGLE io thread num, since only this can guarantee message order
506 * Messages are picked in dnet_io_process_pool() by different threads, and it is possible that completion
507 * callbacks will be executed out of order, which will badly break things.
509 if (!cfg
->io_thread_num
) {
510 cfg
->io_thread_num
= 1;
511 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
)
512 cfg
->io_thread_num
= 20;
515 if (!cfg
->nonblocking_io_thread_num
) {
516 cfg
->nonblocking_io_thread_num
= 1;
518 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
) {
519 if (cfg
->io_thread_num
> 100)
520 cfg
->nonblocking_io_thread_num
= 10;
524 if (!cfg
->net_thread_num
) {
525 cfg
->net_thread_num
= 1;
526 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
)
527 cfg
->net_thread_num
= 8;
530 n
= dnet_node_alloc(cfg
);
537 cfg
->sock_type
= SOCK_STREAM
;
539 cfg
->proto
= IPPROTO_TCP
;
541 cfg
->family
= AF_INET
;
543 if (!cfg
->removal_delay
)
544 cfg
->removal_delay
= 10; /* Store removed files 10 days by default */
546 if (!cfg
->oplock_num
)
547 cfg
->oplock_num
= 1024;
549 n
->proto
= cfg
->proto
;
550 n
->sock_type
= cfg
->sock_type
;
551 n
->family
= cfg
->family
;
552 n
->wait_ts
.tv_sec
= cfg
->wait_timeout
;
556 n
->notify_hash_size
= cfg
->hash_size
;
557 n
->check_timeout
= cfg
->check_timeout
;
558 n
->stall_count
= cfg
->stall_count
;
559 n
->id
.group_id
= cfg
->group_id
;
560 n
->bg_ionice_class
= cfg
->bg_ionice_class
;
561 n
->bg_ionice_prio
= cfg
->bg_ionice_prio
;
562 n
->removal_delay
= cfg
->removal_delay
;
563 n
->flags
= cfg
->flags
;
564 n
->cache_size
= cfg
->cache_size
;
566 if (strlen(cfg
->temp_meta_env
))
567 n
->temp_meta_env
= cfg
->temp_meta_env
;
569 n
->temp_meta_env
= cfg
->history_env
;
572 dnet_log_init(n
, cfg
->log
);
574 dnet_log(n
, DNET_LOG_INFO
, "Elliptics starts\n");
576 if (!n
->wait_ts
.tv_sec
) {
577 n
->wait_ts
.tv_sec
= 5;
578 dnet_log(n
, DNET_LOG_NOTICE
, "Using default wait timeout (%ld seconds).\n",
582 if (!n
->check_timeout
) {
583 n
->check_timeout
= DNET_DEFAULT_CHECK_TIMEOUT_SEC
;
584 dnet_log(n
, DNET_LOG_NOTICE
, "Using default check timeout (%ld seconds).\n",
588 if (!n
->stall_count
) {
589 n
->stall_count
= DNET_DEFAULT_STALL_TRANSACTIONS
;
590 dnet_log(n
, DNET_LOG_NOTICE
, "Using default stall count (%ld transactions).\n",
594 n
->client_prio
= cfg
->client_prio
;
595 n
->server_prio
= cfg
->server_prio
;
597 err
= dnet_crypto_init(n
, cfg
->ns
, cfg
->nsize
);
601 err
= dnet_io_init(n
, cfg
);
603 goto err_out_crypto_cleanup
;
605 err
= dnet_check_thread_start(n
);
607 goto err_out_io_exit
;
609 dnet_log(n
, DNET_LOG_DEBUG
, "New node has been created at %s.\n",
615 err_out_crypto_cleanup
:
616 dnet_crypto_cleanup(n
);
620 if (cfg
->log
&& cfg
->log
->log
)
621 cfg
->log
->log(cfg
->log
->log_private
, DNET_LOG_ERROR
, "Error during node creation.\n");
623 if (cfg
->cb
&& cfg
->cb
->backend_cleanup
)
624 cfg
->cb
->backend_cleanup(cfg
->cb
->command_private
);
628 int dnet_need_exit(struct dnet_node
*n
)
633 void dnet_set_need_exit(struct dnet_node
*n
)
638 void dnet_node_cleanup_common_resources(struct dnet_node
*n
)
640 struct dnet_addr_storage
*it
, *atmp
;
643 dnet_check_thread_stop(n
);
647 pthread_attr_destroy(&n
->attr
);
649 pthread_mutex_destroy(&n
->state_lock
);
650 dnet_crypto_cleanup(n
);
652 list_for_each_entry_safe(it
, atmp
, &n
->reconnect_list
, reconnect_entry
) {
653 list_del(&it
->reconnect_entry
);
656 dnet_counter_destroy(n
);
657 pthread_mutex_destroy(&n
->reconnect_lock
);
658 pthread_mutex_destroy(&n
->group_lock
);
660 dnet_wait_put(n
->wait
);
665 void dnet_node_destroy(struct dnet_node
*n
)
667 dnet_log(n
, DNET_LOG_DEBUG
, "Destroying node at %s, st: %p.\n",
668 dnet_dump_node(n
), n
->st
);
670 dnet_node_cleanup_common_resources(n
);
675 int dnet_node_set_groups(struct dnet_node
*n
, int *groups
, int group_num
)
679 if (groups
&& !group_num
)
681 if (group_num
&& !groups
)
684 g
= malloc(group_num
* sizeof(int));
688 for (i
=0; i
<group_num
; ++i
)
691 pthread_mutex_lock(&n
->group_lock
);
695 n
->group_num
= group_num
;
696 pthread_mutex_unlock(&n
->group_lock
);
701 void dnet_set_timeouts(struct dnet_node
*n
, int wait_timeout
, int check_timeout
)
703 n
->wait_ts
.tv_sec
= wait_timeout
;
704 n
->check_timeout
= check_timeout
;