2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static struct dnet_node
*dnet_node_alloc(struct dnet_config
*cfg
)
32 n
= malloc(sizeof(struct dnet_node
));
36 memset(n
, 0, sizeof(struct dnet_node
));
38 atomic_init(&n
->trans
, 0);
40 err
= dnet_log_init(n
, cfg
->log
);
44 err
= pthread_mutex_init(&n
->state_lock
, NULL
);
46 dnet_log_err(n
, "Failed to initialize state lock: err: %d", err
);
50 n
->wait
= dnet_wait_alloc(0);
52 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate wait structure.\n");
53 goto err_out_destroy_state
;
56 err
= dnet_counter_init(n
);
58 dnet_log_err(n
, "Failed to initialize statictics counters lock: err: %d", err
);
59 goto err_out_destroy_wait
;
62 err
= pthread_mutex_init(&n
->reconnect_lock
, NULL
);
65 dnet_log_err(n
, "Failed to initialize reconnection lock: err: %d", err
);
66 goto err_out_destroy_counter
;
69 err
= pthread_attr_init(&n
->attr
);
72 dnet_log_err(n
, "Failed to initialize pthread attributes: err: %d", err
);
73 goto err_out_destroy_reconnect_lock
;
75 pthread_attr_setdetachstate(&n
->attr
, PTHREAD_CREATE_DETACHED
);
77 n
->autodiscovery_socket
= -1;
79 INIT_LIST_HEAD(&n
->group_list
);
80 INIT_LIST_HEAD(&n
->empty_state_list
);
81 INIT_LIST_HEAD(&n
->storage_state_list
);
82 INIT_LIST_HEAD(&n
->reconnect_list
);
84 INIT_LIST_HEAD(&n
->check_entry
);
86 memcpy(n
->cookie
, cfg
->cookie
, DNET_AUTH_COOKIE_SIZE
);
90 err_out_destroy_reconnect_lock
:
91 pthread_mutex_destroy(&n
->reconnect_lock
);
92 err_out_destroy_counter
:
93 dnet_counter_destroy(n
);
95 dnet_wait_put(n
->wait
);
96 err_out_destroy_state
:
97 pthread_mutex_destroy(&n
->state_lock
);
103 static struct dnet_group
*dnet_group_create(unsigned int group_id
)
105 struct dnet_group
*g
;
107 g
= malloc(sizeof(struct dnet_group
));
111 memset(g
, 0, sizeof(struct dnet_group
));
113 atomic_init(&g
->refcnt
, 1);
114 g
->group_id
= group_id
;
116 INIT_LIST_HEAD(&g
->state_list
);
124 void dnet_group_destroy(struct dnet_group
*g
)
126 if (!list_empty(&g
->state_list
)) {
127 fprintf(stderr
, "BUG in dnet_group_destroy, reference leak.\n");
130 list_del(&g
->group_entry
);
135 static struct dnet_group
*dnet_group_search(struct dnet_node
*n
, unsigned int group_id
)
137 struct dnet_group
*g
, *found
= NULL
;
139 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
140 if (g
->group_id
== group_id
) {
141 found
= dnet_group_get(g
);
149 static int dnet_idc_compare(const void *k1
, const void *k2
)
151 const struct dnet_state_id
*id1
= k1
;
152 const struct dnet_state_id
*id2
= k2
;
154 return dnet_id_cmp_str(id1
->raw
.id
, id2
->raw
.id
);
157 static void dnet_idc_remove_ids(struct dnet_net_state
*st
, struct dnet_group
*g
)
161 for (i
=0, pos
=0; i
<g
->id_num
; ++i
) {
162 if (g
->ids
[i
].idc
!= st
->idc
) {
163 g
->ids
[pos
] = g
->ids
[i
];
170 qsort(g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
);
174 int dnet_idc_create(struct dnet_net_state
*st
, int group_id
, struct dnet_raw_id
*ids
, int id_num
)
176 struct dnet_node
*n
= st
->n
;
177 struct dnet_idc
*idc
;
178 struct dnet_group
*g
;
179 int err
= -ENOMEM
, i
, num
;
180 struct timeval start
, end
;
183 gettimeofday(&start
, NULL
);
185 idc
= malloc(sizeof(struct dnet_idc
) + sizeof(struct dnet_state_id
) * id_num
);
189 memset(idc
, 0, sizeof(struct dnet_idc
));
191 for (i
=0; i
<id_num
; ++i
) {
192 struct dnet_state_id
*sid
= &idc
->ids
[i
];
193 memcpy(&sid
->raw
, &ids
[i
], sizeof(struct dnet_raw_id
));
197 pthread_mutex_lock(&n
->state_lock
);
199 g
= dnet_group_search(n
, group_id
);
201 g
= dnet_group_create(group_id
);
205 list_add_tail(&g
->group_entry
, &n
->group_list
);
208 g
->ids
= realloc(g
->ids
, (g
->id_num
+ id_num
) * sizeof(struct dnet_state_id
));
211 goto err_out_unlock_put
;
215 for (i
=0; i
<id_num
; ++i
) {
216 if (!bsearch(&idc
->ids
[i
], g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
)) {
217 memcpy(&g
->ids
[g
->id_num
+ num
], &idc
->ids
[i
], sizeof(struct dnet_state_id
));
224 goto err_out_unlock_put
;
228 qsort(g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
);
230 list_add_tail(&st
->state_entry
, &g
->state_list
);
231 list_add_tail(&st
->storage_state_entry
, &n
->storage_state_list
);
233 idc
->id_num
= id_num
;
239 if (n
->log
->log_level
> DNET_LOG_DEBUG
) {
240 for (i
=0; i
<g
->id_num
; ++i
) {
241 struct dnet_state_id
*id
= &g
->ids
[i
];
242 dnet_log(n
, DNET_LOG_DEBUG
, "%d: %s -> %s\n", g
->group_id
,
243 dnet_dump_id_str(id
->raw
.id
), dnet_state_dump_addr(id
->idc
->st
));
247 err
= dnet_setup_control_nolock(st
);
249 goto err_out_remove_nolock
;
251 pthread_mutex_unlock(&n
->state_lock
);
253 gettimeofday(&end
, NULL
);
254 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
256 dnet_log(n
, DNET_LOG_NOTICE
, "Initialized group: %d, total ids: %d, added ids: %d out of %d: %ld usecs.\n",
257 g
->group_id
, g
->id_num
, num
, id_num
, diff
);
259 if (n
->server_prio
) {
260 err
= setsockopt(st
->read_s
, IPPROTO_IP
, IP_TOS
, &n
->server_prio
, 4);
263 dnet_log_err(n
, "could not set read server prio %d", n
->server_prio
);
265 err
= setsockopt(st
->write_s
, IPPROTO_IP
, IP_TOS
, &n
->server_prio
, 4);
268 dnet_log_err(n
, "could not set write server prio %d", n
->server_prio
);
272 dnet_log(n
, DNET_LOG_INFO
, "%s: server net TOS value set to %d\n",
273 dnet_server_convert_dnet_addr(&st
->addr
), n
->server_prio
);
279 err_out_remove_nolock
:
280 dnet_idc_remove_ids(st
, g
);
281 list_del_init(&st
->state_entry
);
282 list_del_init(&st
->storage_state_entry
);
286 pthread_mutex_unlock(&n
->state_lock
);
289 gettimeofday(&end
, NULL
);
290 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
291 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialized group %d with %d ids: err: %d: %ld usecs.\n", group_id
, id_num
, err
, diff
);
295 void dnet_idc_destroy_nolock(struct dnet_net_state
*st
)
297 struct dnet_idc
*idc
;
298 struct dnet_group
*g
;
305 dnet_idc_remove_ids(st
, g
);
310 static int __dnet_idc_search(struct dnet_group
*g
, struct dnet_id
*id
)
312 int low
, high
, i
, cmp
;
313 struct dnet_state_id
*sid
;
315 for (low
= -1, high
= g
->id_num
; high
-low
> 1; ) {
316 i
= low
+ (high
- low
)/2;
319 cmp
= dnet_id_cmp_str(sid
->raw
.id
, id
->id
);
336 static struct dnet_state_id
*dnet_idc_search(struct dnet_group
*g
, struct dnet_id
*id
)
338 return &g
->ids
[__dnet_idc_search(g
, id
)];
341 static int dnet_search_range_nolock(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_raw_id
*start
, struct dnet_raw_id
*next
)
343 struct dnet_state_id
*sid
;
344 struct dnet_group
*group
;
347 group
= dnet_group_search(n
, id
->group_id
);
351 idc_pos
= __dnet_idc_search(group
, id
);
352 sid
= &group
->ids
[idc_pos
];
353 memcpy(start
, &sid
->raw
, sizeof(struct dnet_raw_id
));
355 if (++idc_pos
>= group
->id_num
)
357 sid
= &group
->ids
[idc_pos
];
358 memcpy(next
, &sid
->raw
, sizeof(struct dnet_raw_id
));
360 dnet_group_put(group
);
365 int dnet_search_range(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_raw_id
*start
, struct dnet_raw_id
*next
)
369 pthread_mutex_lock(&n
->state_lock
);
370 err
= dnet_search_range_nolock(n
, id
, start
, next
);
371 pthread_mutex_unlock(&n
->state_lock
);
376 static struct dnet_state_id
*__dnet_state_search_id(struct dnet_node
*n
, struct dnet_id
*id
)
378 struct dnet_state_id
*sid
;
379 struct dnet_group
*group
;
381 group
= dnet_group_search(n
, id
->group_id
);
385 sid
= dnet_idc_search(group
, id
);
387 dnet_group_put(group
);
392 static struct dnet_net_state
*__dnet_state_search(struct dnet_node
*n
, struct dnet_id
*id
)
394 struct dnet_state_id
*sid
= __dnet_state_search_id(n
, id
);
399 return dnet_state_get(sid
->idc
->st
);
402 struct dnet_net_state
*dnet_state_search_by_addr(struct dnet_node
*n
, struct dnet_addr
*addr
)
404 struct dnet_net_state
*st
, *found
= NULL
;
405 struct dnet_group
*g
;
407 pthread_mutex_lock(&n
->state_lock
);
408 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
409 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
410 if (st
->addr
.addr_len
== addr
->addr_len
&&
411 !memcmp(addr
, &st
->addr
, st
->addr
.addr_len
)) {
417 dnet_state_get(found
);
421 pthread_mutex_unlock(&n
->state_lock
);
426 struct dnet_net_state
*dnet_state_search_nolock(struct dnet_node
*n
, struct dnet_id
*id
)
428 struct dnet_net_state
*found
;
430 found
= __dnet_state_search(n
, id
);
432 struct dnet_group
*g
;
434 g
= dnet_group_search(n
, id
->group_id
);
438 found
= dnet_state_get(g
->ids
[0].idc
->st
);
446 struct dnet_net_state
*dnet_state_get_first(struct dnet_node
*n
, struct dnet_id
*id
)
448 struct dnet_net_state
*found
;
450 pthread_mutex_lock(&n
->state_lock
);
451 found
= dnet_state_search_nolock(n
, id
);
452 if (found
== n
->st
) {
453 dnet_state_put(found
);
457 pthread_mutex_unlock(&n
->state_lock
);
463 * We do not blindly return n->st, since it will go away eventually,
464 * since we want multiple states/listen sockets per single node
466 struct dnet_net_state
*dnet_node_state(struct dnet_node
*n
)
468 struct dnet_net_state
*found
;
470 pthread_mutex_lock(&n
->state_lock
);
471 found
= dnet_state_search_nolock(n
, &n
->id
);
472 pthread_mutex_unlock(&n
->state_lock
);
477 struct dnet_node
*dnet_node_create(struct dnet_config
*cfg
)
484 if ((cfg
->flags
& DNET_CFG_JOIN_NETWORK
) && (!cfg
->cb
)) {
486 if (cfg
->log
&& cfg
->log
->log
)
487 cfg
->log
->log(cfg
->log
->log_private
, DNET_LOG_ERROR
, "Joining node has to register "
488 "a command handler.\n");
493 * Client must have SINGLE io thread num, since only this can guarantee message order
494 * Messages are picked in dnet_io_process_pool() by different threads, and it is possible that completion
495 * callbacks will be executed out of order, which will badly break things.
497 if (!cfg
->io_thread_num
) {
498 cfg
->io_thread_num
= 1;
499 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
)
500 cfg
->io_thread_num
= 20;
503 if (!cfg
->nonblocking_io_thread_num
) {
504 cfg
->nonblocking_io_thread_num
= 1;
506 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
) {
507 if (cfg
->io_thread_num
> 100)
508 cfg
->nonblocking_io_thread_num
= 10;
512 if (!cfg
->net_thread_num
) {
513 cfg
->net_thread_num
= 1;
514 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
)
515 cfg
->net_thread_num
= 8;
518 n
= dnet_node_alloc(cfg
);
525 cfg
->sock_type
= SOCK_STREAM
;
527 cfg
->proto
= IPPROTO_TCP
;
529 cfg
->family
= AF_INET
;
531 if (!cfg
->removal_delay
)
532 cfg
->removal_delay
= 10; /* Store removed files 10 days by default */
534 if (!cfg
->oplock_num
)
535 cfg
->oplock_num
= 1024;
537 n
->proto
= cfg
->proto
;
538 n
->sock_type
= cfg
->sock_type
;
539 n
->family
= cfg
->family
;
540 n
->wait_ts
.tv_sec
= cfg
->wait_timeout
;
544 n
->notify_hash_size
= cfg
->hash_size
;
545 n
->check_timeout
= cfg
->check_timeout
;
546 n
->stall_count
= cfg
->stall_count
;
547 n
->id
.group_id
= cfg
->group_id
;
548 n
->bg_ionice_class
= cfg
->bg_ionice_class
;
549 n
->bg_ionice_prio
= cfg
->bg_ionice_prio
;
550 n
->removal_delay
= cfg
->removal_delay
;
551 n
->flags
= cfg
->flags
;
552 n
->cache_size
= cfg
->cache_size
;
554 if (strlen(cfg
->temp_meta_env
))
555 n
->temp_meta_env
= cfg
->temp_meta_env
;
557 n
->temp_meta_env
= cfg
->history_env
;
560 dnet_log_init(n
, cfg
->log
);
562 dnet_log(n
, DNET_LOG_INFO
, "Elliptics starts\n");
564 if (!n
->wait_ts
.tv_sec
) {
565 n
->wait_ts
.tv_sec
= 5;
566 dnet_log(n
, DNET_LOG_NOTICE
, "Using default wait timeout (%ld seconds).\n",
570 if (!n
->check_timeout
) {
571 n
->check_timeout
= DNET_DEFAULT_CHECK_TIMEOUT_SEC
;
572 dnet_log(n
, DNET_LOG_NOTICE
, "Using default check timeout (%ld seconds).\n",
576 if (!n
->stall_count
) {
577 n
->stall_count
= DNET_DEFAULT_STALL_TRANSACTIONS
;
578 dnet_log(n
, DNET_LOG_NOTICE
, "Using default stall count (%ld transactions).\n",
582 n
->client_prio
= cfg
->client_prio
;
583 n
->server_prio
= cfg
->server_prio
;
585 err
= dnet_crypto_init(n
, cfg
->ns
, cfg
->nsize
);
589 err
= dnet_io_init(n
, cfg
);
591 goto err_out_crypto_cleanup
;
593 err
= dnet_check_thread_start(n
);
595 goto err_out_io_exit
;
597 dnet_log(n
, DNET_LOG_DEBUG
, "New node has been created at %s.\n", dnet_dump_node(n
));
602 err_out_crypto_cleanup
:
603 dnet_crypto_cleanup(n
);
607 if (cfg
->log
&& cfg
->log
->log
)
608 cfg
->log
->log(cfg
->log
->log_private
, DNET_LOG_ERROR
, "Error during node creation.\n");
610 if (cfg
->cb
&& cfg
->cb
->backend_cleanup
)
611 cfg
->cb
->backend_cleanup(cfg
->cb
->command_private
);
615 int dnet_need_exit(struct dnet_node
*n
)
620 void dnet_set_need_exit(struct dnet_node
*n
)
625 void dnet_node_cleanup_common_resources(struct dnet_node
*n
)
627 struct dnet_addr_storage
*it
, *atmp
;
630 dnet_check_thread_stop(n
);
634 pthread_attr_destroy(&n
->attr
);
636 pthread_mutex_destroy(&n
->state_lock
);
637 dnet_crypto_cleanup(n
);
639 list_for_each_entry_safe(it
, atmp
, &n
->reconnect_list
, reconnect_entry
) {
640 list_del(&it
->reconnect_entry
);
643 dnet_counter_destroy(n
);
644 pthread_mutex_destroy(&n
->reconnect_lock
);
646 dnet_wait_put(n
->wait
);
648 close(n
->autodiscovery_socket
);
651 void dnet_node_destroy(struct dnet_node
*n
)
653 dnet_log(n
, DNET_LOG_DEBUG
, "Destroying node at %s, st: %p.\n",
654 dnet_dump_node(n
), n
->st
);
656 dnet_node_cleanup_common_resources(n
);
661 struct dnet_session
*dnet_session_create(struct dnet_node
*n
)
663 struct dnet_session
*s
;
665 s
= (struct dnet_session
*)malloc(sizeof(struct dnet_session
));
676 void dnet_session_destroy(struct dnet_session
*s
)
678 dnet_log(s
->node
, DNET_LOG_DEBUG
, "Destroying session at %s, st: %p.\n",
679 dnet_dump_node(s
->node
), s
->node
->st
);
683 int dnet_session_set_groups(struct dnet_session
*s
, int *groups
, int group_num
)
687 if (groups
&& !group_num
)
689 if (group_num
&& !groups
)
692 g
= malloc(group_num
* sizeof(int));
696 for (i
=0; i
<group_num
; ++i
)
702 s
->group_num
= group_num
;
707 void dnet_set_timeouts(struct dnet_node
*n
, int wait_timeout
, int check_timeout
)
709 n
->wait_ts
.tv_sec
= wait_timeout
;
710 n
->check_timeout
= check_timeout
;