2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
24 #include "elliptics.h"
25 #include "elliptics/interface.h"
27 static struct dnet_node
*dnet_node_alloc(struct dnet_config
*cfg
)
32 n
= malloc(sizeof(struct dnet_node
));
36 memset(n
, 0, sizeof(struct dnet_node
));
38 atomic_init(&n
->trans
, 0);
40 err
= dnet_log_init(n
, cfg
->log
);
44 err
= pthread_mutex_init(&n
->state_lock
, NULL
);
46 dnet_log_err(n
, "Failed to initialize state lock: err: %d", err
);
50 n
->wait
= dnet_wait_alloc(0);
52 dnet_log(n
, DNET_LOG_ERROR
, "Failed to allocate wait structure.\n");
53 goto err_out_destroy_state
;
56 err
= dnet_counter_init(n
);
58 dnet_log_err(n
, "Failed to initialize statictics counters lock: err: %d", err
);
59 goto err_out_destroy_wait
;
62 err
= pthread_mutex_init(&n
->reconnect_lock
, NULL
);
65 dnet_log_err(n
, "Failed to initialize reconnection lock: err: %d", err
);
66 goto err_out_destroy_counter
;
69 err
= pthread_mutex_init(&n
->group_lock
, NULL
);
72 dnet_log_err(n
, "Failed to initialize group lock: err: %d", err
);
73 goto err_out_destroy_reconnect_lock
;
76 err
= pthread_attr_init(&n
->attr
);
79 dnet_log_err(n
, "Failed to initialize pthread attributes: err: %d", err
);
80 goto err_out_destroy_group_lock
;
82 pthread_attr_setdetachstate(&n
->attr
, PTHREAD_CREATE_DETACHED
);
84 n
->autodiscovery_socket
= -1;
86 INIT_LIST_HEAD(&n
->group_list
);
87 INIT_LIST_HEAD(&n
->empty_state_list
);
88 INIT_LIST_HEAD(&n
->storage_state_list
);
89 INIT_LIST_HEAD(&n
->reconnect_list
);
91 INIT_LIST_HEAD(&n
->check_entry
);
93 memcpy(n
->cookie
, cfg
->cookie
, DNET_AUTH_COOKIE_SIZE
);
97 err_out_destroy_group_lock
:
98 pthread_mutex_destroy(&n
->group_lock
);
99 err_out_destroy_reconnect_lock
:
100 pthread_mutex_destroy(&n
->reconnect_lock
);
101 err_out_destroy_counter
:
102 dnet_counter_destroy(n
);
103 err_out_destroy_wait
:
104 dnet_wait_put(n
->wait
);
105 err_out_destroy_state
:
106 pthread_mutex_destroy(&n
->state_lock
);
112 static struct dnet_group
*dnet_group_create(unsigned int group_id
)
114 struct dnet_group
*g
;
116 g
= malloc(sizeof(struct dnet_group
));
120 memset(g
, 0, sizeof(struct dnet_group
));
122 atomic_init(&g
->refcnt
, 1);
123 g
->group_id
= group_id
;
125 INIT_LIST_HEAD(&g
->state_list
);
133 void dnet_group_destroy(struct dnet_group
*g
)
135 if (!list_empty(&g
->state_list
)) {
136 fprintf(stderr
, "BUG in dnet_group_destroy, reference leak.\n");
139 list_del(&g
->group_entry
);
144 static struct dnet_group
*dnet_group_search(struct dnet_node
*n
, unsigned int group_id
)
146 struct dnet_group
*g
, *found
= NULL
;
148 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
149 if (g
->group_id
== group_id
) {
150 found
= dnet_group_get(g
);
158 static int dnet_idc_compare(const void *k1
, const void *k2
)
160 const struct dnet_state_id
*id1
= k1
;
161 const struct dnet_state_id
*id2
= k2
;
163 return dnet_id_cmp_str(id1
->raw
.id
, id2
->raw
.id
);
166 static void dnet_idc_remove_ids(struct dnet_net_state
*st
, struct dnet_group
*g
)
170 for (i
=0, pos
=0; i
<g
->id_num
; ++i
) {
171 if (g
->ids
[i
].idc
!= st
->idc
) {
172 g
->ids
[pos
] = g
->ids
[i
];
179 qsort(g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
);
183 int dnet_idc_create(struct dnet_net_state
*st
, int group_id
, struct dnet_raw_id
*ids
, int id_num
)
185 struct dnet_node
*n
= st
->n
;
186 struct dnet_idc
*idc
;
187 struct dnet_group
*g
;
188 int err
= -ENOMEM
, i
, num
;
189 struct timeval start
, end
;
192 gettimeofday(&start
, NULL
);
194 idc
= malloc(sizeof(struct dnet_idc
) + sizeof(struct dnet_state_id
) * id_num
);
198 memset(idc
, 0, sizeof(struct dnet_idc
));
200 for (i
=0; i
<id_num
; ++i
) {
201 struct dnet_state_id
*sid
= &idc
->ids
[i
];
202 memcpy(&sid
->raw
, &ids
[i
], sizeof(struct dnet_raw_id
));
206 pthread_mutex_lock(&n
->state_lock
);
208 g
= dnet_group_search(n
, group_id
);
210 g
= dnet_group_create(group_id
);
214 list_add_tail(&g
->group_entry
, &n
->group_list
);
217 g
->ids
= realloc(g
->ids
, (g
->id_num
+ id_num
) * sizeof(struct dnet_state_id
));
220 goto err_out_unlock_put
;
224 for (i
=0; i
<id_num
; ++i
) {
225 if (!bsearch(&idc
->ids
[i
], g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
)) {
226 memcpy(&g
->ids
[g
->id_num
+ num
], &idc
->ids
[i
], sizeof(struct dnet_state_id
));
233 goto err_out_unlock_put
;
237 qsort(g
->ids
, g
->id_num
, sizeof(struct dnet_state_id
), dnet_idc_compare
);
239 list_add_tail(&st
->state_entry
, &g
->state_list
);
240 list_add_tail(&st
->storage_state_entry
, &n
->storage_state_list
);
242 idc
->id_num
= id_num
;
248 if (n
->log
->log_level
> DNET_LOG_DEBUG
) {
249 for (i
=0; i
<g
->id_num
; ++i
) {
250 struct dnet_state_id
*id
= &g
->ids
[i
];
251 dnet_log(n
, DNET_LOG_DEBUG
, "%d: %s -> %s\n", g
->group_id
,
252 dnet_dump_id_str(id
->raw
.id
), dnet_state_dump_addr(id
->idc
->st
));
256 err
= dnet_setup_control_nolock(st
);
258 goto err_out_remove_nolock
;
260 pthread_mutex_unlock(&n
->state_lock
);
262 gettimeofday(&end
, NULL
);
263 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
265 dnet_log(n
, DNET_LOG_NOTICE
, "Initialized group: %d, total ids: %d, added ids: %d out of %d: %ld usecs.\n",
266 g
->group_id
, g
->id_num
, num
, id_num
, diff
);
268 if (n
->server_prio
) {
269 err
= setsockopt(st
->read_s
, IPPROTO_IP
, IP_TOS
, &n
->server_prio
, 4);
272 dnet_log_err(n
, "could not set read server prio %d", n
->server_prio
);
274 err
= setsockopt(st
->write_s
, IPPROTO_IP
, IP_TOS
, &n
->server_prio
, 4);
277 dnet_log_err(n
, "could not set write server prio %d", n
->server_prio
);
281 dnet_log(n
, DNET_LOG_INFO
, "%s: server net TOS value set to %d\n",
282 dnet_server_convert_dnet_addr(&st
->addr
), n
->server_prio
);
288 err_out_remove_nolock
:
289 dnet_idc_remove_ids(st
, g
);
290 list_del_init(&st
->state_entry
);
291 list_del_init(&st
->storage_state_entry
);
295 pthread_mutex_unlock(&n
->state_lock
);
298 gettimeofday(&end
, NULL
);
299 diff
= (end
.tv_sec
- start
.tv_sec
) * 1000000 + end
.tv_usec
- start
.tv_usec
;
300 dnet_log(n
, DNET_LOG_ERROR
, "Failed to initialized group %d with %d ids: err: %d: %ld usecs.\n", group_id
, id_num
, err
, diff
);
304 void dnet_idc_destroy_nolock(struct dnet_net_state
*st
)
306 struct dnet_idc
*idc
;
307 struct dnet_group
*g
;
314 dnet_idc_remove_ids(st
, g
);
319 static int __dnet_idc_search(struct dnet_group
*g
, struct dnet_id
*id
)
321 int low
, high
, i
, cmp
;
322 struct dnet_state_id
*sid
;
324 for (low
= -1, high
= g
->id_num
; high
-low
> 1; ) {
325 i
= low
+ (high
- low
)/2;
328 cmp
= dnet_id_cmp_str(sid
->raw
.id
, id
->id
);
345 static struct dnet_state_id
*dnet_idc_search(struct dnet_group
*g
, struct dnet_id
*id
)
347 return &g
->ids
[__dnet_idc_search(g
, id
)];
350 static int dnet_search_range_nolock(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_raw_id
*start
, struct dnet_raw_id
*next
)
352 struct dnet_state_id
*sid
;
353 struct dnet_group
*group
;
356 group
= dnet_group_search(n
, id
->group_id
);
360 idc_pos
= __dnet_idc_search(group
, id
);
361 sid
= &group
->ids
[idc_pos
];
362 memcpy(start
, &sid
->raw
, sizeof(struct dnet_raw_id
));
364 if (++idc_pos
>= group
->id_num
)
366 sid
= &group
->ids
[idc_pos
];
367 memcpy(next
, &sid
->raw
, sizeof(struct dnet_raw_id
));
369 dnet_group_put(group
);
374 int dnet_search_range(struct dnet_node
*n
, struct dnet_id
*id
, struct dnet_raw_id
*start
, struct dnet_raw_id
*next
)
378 pthread_mutex_lock(&n
->state_lock
);
379 err
= dnet_search_range_nolock(n
, id
, start
, next
);
380 pthread_mutex_unlock(&n
->state_lock
);
385 static struct dnet_state_id
*__dnet_state_search_id(struct dnet_node
*n
, struct dnet_id
*id
)
387 struct dnet_state_id
*sid
;
388 struct dnet_group
*group
;
390 group
= dnet_group_search(n
, id
->group_id
);
394 sid
= dnet_idc_search(group
, id
);
396 dnet_group_put(group
);
401 static struct dnet_net_state
*__dnet_state_search(struct dnet_node
*n
, struct dnet_id
*id
)
403 struct dnet_state_id
*sid
= __dnet_state_search_id(n
, id
);
408 return dnet_state_get(sid
->idc
->st
);
411 struct dnet_net_state
*dnet_state_search_by_addr(struct dnet_node
*n
, struct dnet_addr
*addr
)
413 struct dnet_net_state
*st
, *found
= NULL
;
414 struct dnet_group
*g
;
416 pthread_mutex_lock(&n
->state_lock
);
417 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
418 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
419 if (st
->addr
.addr_len
== addr
->addr_len
&&
420 !memcmp(addr
, &st
->addr
, st
->addr
.addr_len
)) {
426 dnet_state_get(found
);
430 pthread_mutex_unlock(&n
->state_lock
);
435 struct dnet_net_state
*dnet_state_search_nolock(struct dnet_node
*n
, struct dnet_id
*id
)
437 struct dnet_net_state
*found
;
439 found
= __dnet_state_search(n
, id
);
441 struct dnet_group
*g
;
443 g
= dnet_group_search(n
, id
->group_id
);
447 found
= dnet_state_get(g
->ids
[0].idc
->st
);
455 struct dnet_net_state
*dnet_state_get_first(struct dnet_node
*n
, struct dnet_id
*id
)
457 struct dnet_net_state
*found
;
459 pthread_mutex_lock(&n
->state_lock
);
460 found
= dnet_state_search_nolock(n
, id
);
461 if (found
== n
->st
) {
462 dnet_state_put(found
);
466 pthread_mutex_unlock(&n
->state_lock
);
472 * We do not blindly return n->st, since it will go away eventually,
473 * since we want multiple states/listen sockets per single node
475 struct dnet_net_state
*dnet_node_state(struct dnet_node
*n
)
477 struct dnet_net_state
*found
;
479 pthread_mutex_lock(&n
->state_lock
);
480 found
= dnet_state_search_nolock(n
, &n
->id
);
481 pthread_mutex_unlock(&n
->state_lock
);
486 struct dnet_node
*dnet_node_create(struct dnet_config
*cfg
)
495 pthread_sigmask(SIG_BLOCK
, &sig
, NULL
);
496 sigprocmask(SIG_BLOCK
, &sig
, NULL
);
498 if ((cfg
->flags
& DNET_CFG_JOIN_NETWORK
) && (!cfg
->cb
)) {
500 if (cfg
->log
&& cfg
->log
->log
)
501 cfg
->log
->log(cfg
->log
->log_private
, DNET_LOG_ERROR
, "Joining node has to register "
502 "a command handler.\n");
507 * Client must have SINGLE io thread num, since only this can guarantee message order
508 * Messages are picked in dnet_io_process_pool() by different threads, and it is possible that completion
509 * callbacks will be executed out of order, which will badly break things.
511 if (!cfg
->io_thread_num
) {
512 cfg
->io_thread_num
= 1;
513 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
)
514 cfg
->io_thread_num
= 20;
517 if (!cfg
->nonblocking_io_thread_num
) {
518 cfg
->nonblocking_io_thread_num
= 1;
520 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
) {
521 if (cfg
->io_thread_num
> 100)
522 cfg
->nonblocking_io_thread_num
= 10;
526 if (!cfg
->net_thread_num
) {
527 cfg
->net_thread_num
= 1;
528 if (cfg
->flags
& DNET_CFG_JOIN_NETWORK
)
529 cfg
->net_thread_num
= 8;
532 n
= dnet_node_alloc(cfg
);
539 cfg
->sock_type
= SOCK_STREAM
;
541 cfg
->proto
= IPPROTO_TCP
;
543 cfg
->family
= AF_INET
;
545 if (!cfg
->removal_delay
)
546 cfg
->removal_delay
= 10; /* Store removed files 10 days by default */
548 if (!cfg
->oplock_num
)
549 cfg
->oplock_num
= 1024;
551 n
->proto
= cfg
->proto
;
552 n
->sock_type
= cfg
->sock_type
;
553 n
->family
= cfg
->family
;
554 n
->wait_ts
.tv_sec
= cfg
->wait_timeout
;
558 n
->notify_hash_size
= cfg
->hash_size
;
559 n
->check_timeout
= cfg
->check_timeout
;
560 n
->stall_count
= cfg
->stall_count
;
561 n
->id
.group_id
= cfg
->group_id
;
562 n
->bg_ionice_class
= cfg
->bg_ionice_class
;
563 n
->bg_ionice_prio
= cfg
->bg_ionice_prio
;
564 n
->removal_delay
= cfg
->removal_delay
;
565 n
->flags
= cfg
->flags
;
566 n
->cache_size
= cfg
->cache_size
;
568 if (strlen(cfg
->temp_meta_env
))
569 n
->temp_meta_env
= cfg
->temp_meta_env
;
571 n
->temp_meta_env
= cfg
->history_env
;
574 dnet_log_init(n
, cfg
->log
);
576 dnet_log(n
, DNET_LOG_INFO
, "Elliptics starts\n");
578 if (!n
->wait_ts
.tv_sec
) {
579 n
->wait_ts
.tv_sec
= 5;
580 dnet_log(n
, DNET_LOG_NOTICE
, "Using default wait timeout (%ld seconds).\n",
584 if (!n
->check_timeout
) {
585 n
->check_timeout
= DNET_DEFAULT_CHECK_TIMEOUT_SEC
;
586 dnet_log(n
, DNET_LOG_NOTICE
, "Using default check timeout (%ld seconds).\n",
590 if (!n
->stall_count
) {
591 n
->stall_count
= DNET_DEFAULT_STALL_TRANSACTIONS
;
592 dnet_log(n
, DNET_LOG_NOTICE
, "Using default stall count (%ld transactions).\n",
596 n
->client_prio
= cfg
->client_prio
;
597 n
->server_prio
= cfg
->server_prio
;
599 err
= dnet_crypto_init(n
, cfg
->ns
, cfg
->nsize
);
603 err
= dnet_io_init(n
, cfg
);
605 goto err_out_crypto_cleanup
;
607 err
= dnet_check_thread_start(n
);
609 goto err_out_io_exit
;
611 dnet_log(n
, DNET_LOG_DEBUG
, "New node has been created at %s.\n", dnet_dump_node(n
));
616 err_out_crypto_cleanup
:
617 dnet_crypto_cleanup(n
);
621 if (cfg
->log
&& cfg
->log
->log
)
622 cfg
->log
->log(cfg
->log
->log_private
, DNET_LOG_ERROR
, "Error during node creation.\n");
624 if (cfg
->cb
&& cfg
->cb
->backend_cleanup
)
625 cfg
->cb
->backend_cleanup(cfg
->cb
->command_private
);
629 int dnet_need_exit(struct dnet_node
*n
)
634 void dnet_set_need_exit(struct dnet_node
*n
)
639 void dnet_node_cleanup_common_resources(struct dnet_node
*n
)
641 struct dnet_addr_storage
*it
, *atmp
;
644 dnet_check_thread_stop(n
);
648 pthread_attr_destroy(&n
->attr
);
650 pthread_mutex_destroy(&n
->state_lock
);
651 dnet_crypto_cleanup(n
);
653 list_for_each_entry_safe(it
, atmp
, &n
->reconnect_list
, reconnect_entry
) {
654 list_del(&it
->reconnect_entry
);
657 dnet_counter_destroy(n
);
658 pthread_mutex_destroy(&n
->reconnect_lock
);
659 pthread_mutex_destroy(&n
->group_lock
);
661 dnet_wait_put(n
->wait
);
665 close(n
->autodiscovery_socket
);
668 void dnet_node_destroy(struct dnet_node
*n
)
670 dnet_log(n
, DNET_LOG_DEBUG
, "Destroying node at %s, st: %p.\n",
671 dnet_dump_node(n
), n
->st
);
673 dnet_node_cleanup_common_resources(n
);
678 int dnet_node_set_groups(struct dnet_node
*n
, int *groups
, int group_num
)
682 if (groups
&& !group_num
)
684 if (group_num
&& !groups
)
687 g
= malloc(group_num
* sizeof(int));
691 for (i
=0; i
<group_num
; ++i
)
694 pthread_mutex_lock(&n
->group_lock
);
698 n
->group_num
= group_num
;
699 pthread_mutex_unlock(&n
->group_lock
);
704 void dnet_set_timeouts(struct dnet_node
*n
, int wait_timeout
, int check_timeout
)
706 n
->wait_ts
.tv_sec
= wait_timeout
;
707 n
->check_timeout
= check_timeout
;