2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #ifndef __DNET_ELLIPTICS_H
17 #define __DNET_ELLIPTICS_H
20 #include <sys/socket.h>
21 #include <sys/epoll.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
32 #include <eblob/blob.h>
35 typedef unsigned char u_char
;
36 typedef unsigned short u_short
;
47 #include "elliptics/packet.h"
48 #include "elliptics/interface.h"
55 #define __unused __attribute__ ((unused))
60 struct dnet_net_state
;
62 #define dnet_log(n, level, format, a...) do { if (n->log && (n->log->log_level >= level)) dnet_log_raw(n, level, format, ##a); } while (0)
63 #define dnet_log_err(n, f, a...) dnet_log(n, DNET_LOG_ERROR, f ": %s [%d].\n", ##a, strerror(errno), errno)
66 struct list_head req_entry
;
68 struct dnet_net_state
*st
;
83 * Currently executed network state machine:
84 * receives and sends command and data.
87 /* Reading a command */
88 #define DNET_IO_CMD (1<<0)
90 /* Attached data should be discarded */
91 #define DNET_IO_DROP (1<<1)
93 #define DNET_STATE_MAX_WEIGHT (1024 * 10)
97 struct list_head state_entry
;
98 struct list_head storage_state_entry
;
111 struct dnet_addr addr
;
113 int (* process
)(struct dnet_net_state
*st
, struct epoll_event
*ev
);
115 struct dnet_cmd rcv_cmd
;
118 unsigned int rcv_flags
;
123 pthread_mutex_t send_lock
;
124 struct list_head send_list
;
126 pthread_mutex_t trans_lock
;
127 struct rb_root trans_root
;
128 struct list_head trans_list
;
132 unsigned long long free
;
134 long median_read_time
;
136 struct dnet_idc
*idc
;
138 struct dnet_stat_count stat
[__DNET_CMD_MAX
];
142 struct dnet_state_id
{
143 struct dnet_raw_id raw
;
144 struct dnet_idc
*idc
;
148 struct dnet_net_state
*st
;
149 struct dnet_group
*group
;
151 struct dnet_state_id ids
[];
154 int dnet_idc_create(struct dnet_net_state
*st
, int group_id
, struct dnet_raw_id
*ids
, int id_num
);
155 void dnet_idc_destroy_nolock(struct dnet_net_state
*st
);
157 struct dnet_net_state
*dnet_state_create(struct dnet_node
*n
,
158 int group_id
, struct dnet_raw_id
*ids
, int id_num
,
159 struct dnet_addr
*addr
, int s
, int *errp
, int join
,
160 int (* process
)(struct dnet_net_state
*st
, struct epoll_event
*ev
));
162 void dnet_state_reset(struct dnet_net_state
*st
);
163 void dnet_state_remove_nolock(struct dnet_net_state
*st
);
165 struct dnet_net_state
*dnet_state_search_by_addr(struct dnet_node
*n
, struct dnet_addr
*addr
);
166 struct dnet_net_state
*dnet_state_get_first(struct dnet_node
*n
, struct dnet_id
*id
);
167 struct dnet_net_state
*dnet_state_search_nolock(struct dnet_node
*n
, struct dnet_id
*id
);
168 struct dnet_net_state
*dnet_node_state(struct dnet_node
*n
);
170 void dnet_node_cleanup_common_resources(struct dnet_node
*n
);
172 int dnet_search_range(struct dnet_node
*n
, struct dnet_id
*id
,
173 struct dnet_raw_id
*start
, struct dnet_raw_id
*next
);
175 int dnet_recv_route_list(struct dnet_net_state
*st
);
177 void dnet_state_destroy(struct dnet_net_state
*st
);
179 void dnet_schedule_command(struct dnet_net_state
*st
);
181 int dnet_schedule_send(struct dnet_net_state
*st
);
182 int dnet_schedule_recv(struct dnet_net_state
*st
);
184 void dnet_unschedule_send(struct dnet_net_state
*st
);
185 void dnet_unschedule_recv(struct dnet_net_state
*st
);
187 int dnet_setup_control_nolock(struct dnet_net_state
*st
);
189 int dnet_add_reconnect_state(struct dnet_node
*n
, struct dnet_addr
*addr
, unsigned int join_state
);
191 static inline struct dnet_net_state
*dnet_state_get(struct dnet_net_state
*st
)
193 atomic_inc(&st
->refcnt
);
196 static inline void dnet_state_put(struct dnet_net_state
*st
)
199 * State can be NULL here when we just want to kick IO thread.
201 if (st
&& atomic_dec_and_test(&st
->refcnt
))
202 dnet_state_destroy(st
);
208 pthread_mutex_t wait_lock
;
218 #define dnet_wait_event(w, condition, wts) \
221 struct timespec __ts; \
222 struct timeval __tv; \
223 gettimeofday(&__tv, NULL); \
224 __ts.tv_nsec = __tv.tv_usec * 1000 + (wts)->tv_nsec; \
225 __ts.tv_sec = __tv.tv_sec + (wts)->tv_sec; \
226 pthread_mutex_lock(&(w)->wait_lock); \
227 while (!(condition) && !__err) \
228 __err = pthread_cond_timedwait(&(w)->wait, &(w)->wait_lock, &__ts); \
229 pthread_mutex_unlock(&(w)->wait_lock); \
233 #define dnet_wakeup(w, task) \
235 pthread_mutex_lock(&(w)->wait_lock); \
237 pthread_cond_broadcast(&(w)->wait); \
238 pthread_mutex_unlock(&(w)->wait_lock); \
241 struct dnet_wait
*dnet_wait_alloc(int cond
);
242 void dnet_wait_destroy(struct dnet_wait
*w
);
244 static inline struct dnet_wait
*dnet_wait_get(struct dnet_wait
*w
)
246 atomic_inc(&w
->refcnt
);
250 static inline void dnet_wait_put(struct dnet_wait
*w
)
252 if (atomic_dec_and_test(&w
->refcnt
))
253 dnet_wait_destroy(w
);
256 struct dnet_notify_bucket
258 struct list_head notify_list
;
259 pthread_rwlock_t notify_lock
;
262 int dnet_update_notify(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
);
264 int dnet_notify_add(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
);
265 int dnet_notify_remove(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
);
267 int dnet_notify_init(struct dnet_node
*n
);
268 void dnet_notify_exit(struct dnet_node
*n
);
272 struct list_head group_entry
;
274 unsigned int group_id
;
276 struct list_head state_list
;
281 struct dnet_state_id
*ids
;
284 static inline struct dnet_group
*dnet_group_get(struct dnet_group
*g
)
286 atomic_inc(&g
->refcnt
);
290 void dnet_group_destroy(struct dnet_group
*g
);
291 static inline void dnet_group_put(struct dnet_group
*g
)
293 if (g
&& atomic_dec_and_test(&g
->refcnt
))
294 dnet_group_destroy(g
);
297 struct dnet_transform
301 int (* transform
)(void *priv
, const void *src
, uint64_t size
,
302 void *dst
, unsigned int *dsize
, unsigned int flags
);
305 int dnet_crypto_init(struct dnet_node
*n
, void *ns
, int nsize
);
306 void dnet_crypto_cleanup(struct dnet_node
*n
);
314 enum dnet_work_io_mode
{
315 DNET_WORK_IO_MODE_BLOCKING
= 0,
316 DNET_WORK_IO_MODE_NONBLOCKING
,
317 DNET_WORK_IO_MODE_EXEC_BLOCKING
,
320 struct dnet_work_pool
;
321 struct dnet_work_io
{
322 struct list_head wio_entry
;
325 struct dnet_work_pool
*pool
;
328 struct dnet_work_pool
{
333 struct list_head list
;
334 pthread_mutex_t lock
;
336 struct list_head wio_list
;
342 int net_thread_num
, net_thread_pos
;
343 struct dnet_net_io
*net
;
345 struct dnet_work_pool
*recv_pool
;
346 struct dnet_work_pool
*recv_pool_nb
;
349 int dnet_state_accept_process(struct dnet_net_state
*st
, struct epoll_event
*ev
);
350 int dnet_state_net_process(struct dnet_net_state
*st
, struct epoll_event
*ev
);
351 int dnet_io_init(struct dnet_node
*n
, struct dnet_config
*cfg
);
352 void dnet_io_exit(struct dnet_node
*n
);
354 void dnet_io_req_free(struct dnet_io_req
*r
);
358 pthread_mutex_t lock
[0];
361 void dnet_locks_destroy(struct dnet_node
*n
);
362 int dnet_locks_init(struct dnet_node
*n
, int num
);
363 void dnet_oplock(struct dnet_node
*n
, struct dnet_id
*key
);
364 void dnet_opunlock(struct dnet_node
*n
, struct dnet_id
*key
);
365 int dnet_optrylock(struct dnet_node
*n
, struct dnet_id
*key
);
369 struct list_head check_entry
;
371 struct dnet_transform transform
;
375 int autodiscovery_socket
;
376 struct dnet_addr autodiscovery_addr
;
385 struct dnet_addr addr
;
386 int sock_type
, proto
, family
;
388 pthread_mutex_t state_lock
;
389 struct list_head group_list
;
391 /* hosts client states, i.e. those who didn't join network */
392 struct list_head empty_state_list
;
394 /* hosts all states added to given node */
395 struct list_head storage_state_list
;
399 struct dnet_net_state
*st
;
403 struct dnet_log
*log
;
405 struct dnet_wait
*wait
;
406 struct timespec wait_ts
;
410 int check_in_progress
;
415 pthread_t monitor_tid
;
420 struct dnet_backend_callbacks
*cb
;
422 unsigned int notify_hash_size
;
423 struct dnet_notify_bucket
*notify_hash
;
425 pthread_mutex_t reconnect_lock
;
426 struct list_head reconnect_list
;
428 struct dnet_lock counters_lock
;
429 struct dnet_stat_count counters
[__DNET_CNTR_MAX
];
435 char cookie
[DNET_AUTH_COOKIE_SIZE
];
442 struct dnet_locks
*locks
;
449 struct dnet_session
{
450 struct dnet_node
*node
;
455 static inline int dnet_counter_init(struct dnet_node
*n
)
457 memset(&n
->counters
, 0, __DNET_CNTR_MAX
* sizeof(struct dnet_stat_count
));
458 return dnet_lock_init(&n
->counters_lock
);
461 static inline void dnet_counter_destroy(struct dnet_node
*n
)
463 return dnet_lock_destroy(&n
->counters_lock
);
466 static inline void dnet_counter_inc(struct dnet_node
*n
, int counter
, int err
)
468 if (counter
>= __DNET_CNTR_MAX
)
469 counter
= DNET_CNTR_UNKNOWN
;
471 dnet_lock_lock(&n
->counters_lock
);
473 n
->counters
[counter
].count
++;
475 n
->counters
[counter
].err
++;
476 dnet_lock_unlock(&n
->counters_lock
);
478 dnet_log(n
, DNET_LOG_DEBUG
, "Incrementing counter: %d, err: %d, value is: %llu %llu.\n",
480 (unsigned long long)n
->counters
[counter
].count
,
481 (unsigned long long)n
->counters
[counter
].err
);
484 static inline void dnet_counter_set(struct dnet_node
*n
, int counter
, int err
, int64_t val
)
486 if (counter
>= __DNET_CNTR_MAX
)
487 counter
= DNET_CNTR_UNKNOWN
;
489 dnet_lock_lock(&n
->counters_lock
);
491 n
->counters
[counter
].count
= val
;
493 n
->counters
[counter
].err
= val
;
494 dnet_lock_unlock(&n
->counters_lock
);
497 static inline char *dnet_dump_node(struct dnet_node
*n
)
499 static char buf
[128];
501 return dnet_server_convert_dnet_addr_raw(&n
->addr
, buf
, sizeof(buf
));
505 int __attribute__((weak
)) dnet_process_cmd_raw(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, void *data
);
506 int dnet_process_recv(struct dnet_net_state
*st
, struct dnet_io_req
*r
);
508 int dnet_recv(struct dnet_net_state
*st
, void *data
, unsigned int size
);
509 int dnet_sendfile(struct dnet_net_state
*st
, int fd
, uint64_t *offset
, uint64_t size
);
511 int dnet_send_request(struct dnet_net_state
*st
, struct dnet_io_req
*r
);
514 int dnet_socket_create(struct dnet_node
*n
, struct dnet_config
*cfg
, struct dnet_addr
*addr
, int listening
);
515 int dnet_socket_create_addr(struct dnet_node
*n
, int sock_type
, int proto
, int family
,
516 struct sockaddr
*sa
, unsigned int salen
, int listening
);
518 void dnet_set_sockopt(int s
);
519 void dnet_sock_close(int s
);
521 enum dnet_join_state
{
522 DNET_JOIN
= 1, /* Node joined the network */
523 DNET_WANT_RECONNECT
, /* State must be reconnected, when remote peer failed */
526 int __attribute__((weak
)) dnet_state_join_nolock(struct dnet_net_state
*st
);
530 struct rb_node trans_entry
;
531 struct list_head trans_list_entry
;
533 struct timeval time
, start
;
535 struct dnet_net_state
*orig
; /* only for forward */
537 struct dnet_net_state
*st
;
538 uint64_t trans
, rcv_trans
;
543 int command
; /* main command this transaction carries */
546 int (* complete
)(struct dnet_net_state
*st
,
547 struct dnet_cmd
*cmd
,
551 void dnet_trans_destroy(struct dnet_trans
*t
);
552 struct dnet_trans
*dnet_trans_alloc(struct dnet_node
*n
, uint64_t size
);
553 int dnet_trans_alloc_send_state(struct dnet_net_state
*st
, struct dnet_trans_control
*ctl
);
554 int dnet_trans_timer_setup(struct dnet_trans
*t
);
556 static inline struct dnet_trans
*dnet_trans_get(struct dnet_trans
*t
)
558 atomic_inc(&t
->refcnt
);
562 static inline void dnet_trans_put(struct dnet_trans
*t
)
564 if (t
&& atomic_dec_and_test(&t
->refcnt
))
565 dnet_trans_destroy(t
);
568 int dnet_trans_insert_nolock(struct rb_root
*root
, struct dnet_trans
*a
);
569 void dnet_trans_remove(struct dnet_trans
*t
);
570 void dnet_trans_remove_nolock(struct rb_root
*root
, struct dnet_trans
*t
);
571 struct dnet_trans
*dnet_trans_search(struct rb_root
*root
, uint64_t trans
);
573 int dnet_trans_send(struct dnet_trans
*t
, struct dnet_io_req
*req
);
575 int dnet_recv_list(struct dnet_node
*n
, struct dnet_net_state
*st
);
577 ssize_t
dnet_send_fd(struct dnet_net_state
*st
, void *header
, uint64_t hsize
,
578 int fd
, uint64_t offset
, uint64_t dsize
, int close_on_exit
);
579 ssize_t
dnet_send_data(struct dnet_net_state
*st
, void *header
, uint64_t hsize
, void *data
, uint64_t dsize
);
580 ssize_t
dnet_send(struct dnet_net_state
*st
, void *data
, uint64_t size
);
581 ssize_t
dnet_send_nolock(struct dnet_net_state
*st
, void *data
, uint64_t size
);
583 struct dnet_io_completion
585 struct dnet_wait
*wait
;
590 struct dnet_addr_storage
592 int reconnect_time
, reconnect_time_max
;
593 struct list_head reconnect_entry
;
594 struct dnet_addr addr
;
595 unsigned int __join_state
;
599 * Returns true if t1 is before than t2.
601 static inline int dnet_time_before(struct timespec
*t1
, struct timespec
*t2
)
603 if ((long)(t1
->tv_sec
- t2
->tv_sec
) < 0)
606 if ((long)(t2
->tv_sec
- t1
->tv_sec
) < 0)
609 return ((long)(t1
->tv_nsec
- t2
->tv_nsec
) < 0);
611 #define dnet_time_after(t2, t1) dnet_time_before(t1, t2)
613 int dnet_check_thread_start(struct dnet_node
*n
);
614 void dnet_check_thread_stop(struct dnet_node
*n
);
615 int dnet_try_reconnect(struct dnet_node
*n
);
617 #define DNET_CHECK_TYPE_COPIES_HISTORY 1
618 #define DNET_CHECK_TYPE_COPIES_FULL 2
619 #define DNET_CHECK_TYPE_MERGE 3
620 #define DNET_CHECK_TYPE_DELETE 4
622 #define DNET_BULK_IDS_SIZE 1000
623 #define DNET_BULK_CHECK_PING 100
624 #define DNET_BULK_STATES_ALLOC_STEP 10
625 #define DNET_BULK_META_UPD_SIZE 1000
629 struct dnet_raw_id id
;
630 struct dnet_meta_update last_update
;
631 } __attribute__ ((packed
));
633 struct dnet_bulk_state
635 struct dnet_addr addr
;
636 pthread_mutex_t state_lock
;
638 struct dnet_bulk_id
*ids
;
641 struct dnet_bulk_array
644 struct dnet_bulk_state
*states
;
648 static inline int dnet_compare_bulk_state(const void *k1
, const void *k2
)
650 const struct dnet_bulk_state
*st1
= (const struct dnet_bulk_state
*)k1
;
651 const struct dnet_bulk_state
*st2
= (const struct dnet_bulk_state
*)k2
;
653 if (st1
->addr
.addr_len
> st2
->addr
.addr_len
)
655 if (st1
->addr
.addr_len
< st2
->addr
.addr_len
)
657 return memcmp(st1
->addr
.addr
, st2
->addr
.addr
, st1
->addr
.addr_len
);
660 struct dnet_check_temp_db
{
661 struct eblob_backend
*b
;
662 struct eblob_log log
;
666 struct dnet_check_params
{
667 struct dnet_check_temp_db
*db
;
673 static inline struct dnet_check_temp_db
* dnet_check_temp_db_get(struct dnet_check_temp_db
*db
) {
674 atomic_inc(&db
->refcnt
);
678 static inline void dnet_check_temp_db_put(struct dnet_check_temp_db
*db
) {
679 if (atomic_dec_and_test(&db
->refcnt
)) {
680 eblob_remove_blobs(db
->b
);
681 eblob_cleanup(db
->b
);
686 int dnet_check(struct dnet_node
*n
, struct dnet_meta_container
*mc
, struct dnet_bulk_array
*bulk_array
, int check_copies
, struct dnet_check_params
*params
);
687 int dnet_db_list(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
);
688 int dnet_cmd_bulk_check(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, void *data
);
689 int dnet_request_bulk_check(struct dnet_node
*n
, struct dnet_bulk_state
*state
, struct dnet_check_params
*params
);
691 struct dnet_meta_update
* dnet_get_meta_update(struct dnet_node
*n
, struct dnet_meta_container
*mc
,
692 struct dnet_meta_update
*meta_update
);
694 int dnet_update_ts_metadata(struct eblob_backend
*b
, struct dnet_raw_id
*id
, uint64_t flags_set
, uint64_t flags_clear
);
695 int dnet_update_ts_metadata_raw(struct dnet_meta_container
*mc
, uint64_t flags_set
, uint64_t flags_clear
);
697 int dnet_process_meta(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
);
698 void dnet_convert_metadata(struct dnet_node
*n __unused
, void *data
, int size
);
700 void dnet_monitor_exit(struct dnet_node
*n
);
701 int dnet_monitor_init(struct dnet_node
*n
, struct dnet_config
*cfg
);
703 int dnet_set_name(char *name
);
704 int dnet_ioprio_set(long pid
, int class_id
, int prio
);
705 int dnet_ioprio_get(long pid
);
709 uint64_t offset
, size
;
713 uint64_t mapped_size
;
717 int dnet_data_map(struct dnet_map_fd
*map
);
718 void dnet_data_unmap(struct dnet_map_fd
*map
);
720 void *dnet_read_data_wait_raw(struct dnet_session
*s
, struct dnet_id
*id
, struct dnet_io_attr
*io
, int cmd
, uint64_t cflags
, int *errp
);
722 int dnet_srw_init(struct dnet_node
*n
, struct dnet_config
*cfg
);
723 void dnet_srw_cleanup(struct dnet_node
*n
);
724 int dnet_cmd_exec_raw(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct sph
*header
, const void *data
);
726 int dnet_cache_init(struct dnet_node
*n
);
727 void dnet_cache_cleanup(struct dnet_node
*n
);
728 int dnet_cmd_cache_io(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct dnet_io_attr
*io
, char *data
);
730 int __attribute__((weak
)) dnet_remove_local(struct dnet_node
*n
, struct dnet_id
*id
);
732 int dnet_discovery(struct dnet_node
*n
);
738 #endif /* __DNET_ELLIPTICS_H */