segfault/memleak on incorrect data fixed
[elliptics.git] / library / elliptics.h
blob5880b828df1d53e30503e627ad5d1b433140b464
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #ifndef __DNET_ELLIPTICS_H
17 #define __DNET_ELLIPTICS_H
19 #include <sys/time.h>
20 #include <sys/socket.h>
21 #include <sys/epoll.h>
23 #include <errno.h>
24 #include <netdb.h>
25 #include <string.h>
26 #include <pthread.h>
27 #include <time.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
32 #include <eblob/blob.h>
34 #ifndef HAVE_UCHAR
35 typedef unsigned char u_char;
36 typedef unsigned short u_short;
37 #endif
39 #include "list.h"
41 #undef offsetof
42 #include "rbtree.h"
44 #include "atomic.h"
45 #include "lock.h"
47 #include "elliptics/packet.h"
48 #include "elliptics/interface.h"
50 #ifdef __cplusplus
51 extern "C" {
52 #endif
54 #ifndef __unused
55 #define __unused __attribute__ ((unused))
56 #endif
58 struct dnet_node;
59 struct dnet_group;
60 struct dnet_net_state;
62 #define dnet_log(n, level, format, a...) do { if (n->log && (n->log->log_level >= level)) dnet_log_raw(n, level, format, ##a); } while (0)
63 #define dnet_log_err(n, f, a...) dnet_log(n, DNET_LOG_ERROR, f ": %s [%d].\n", ##a, strerror(errno), errno)
65 struct dnet_io_req {
66 struct list_head req_entry;
68 struct dnet_net_state *st;
70 void *header;
71 size_t hsize;
73 void *data;
74 size_t dsize;
76 int close_on_exit;
77 int fd;
78 off_t local_offset;
79 size_t fsize;
83 * Currently executed network state machine:
84 * receives and sends command and data.
87 /* Reading a command */
88 #define DNET_IO_CMD (1<<0)
90 /* Attached data should be discarded */
91 #define DNET_IO_DROP (1<<1)
93 #define DNET_STATE_MAX_WEIGHT (1024 * 10)
95 struct dnet_net_state
97 struct list_head state_entry;
98 struct list_head storage_state_entry;
100 struct dnet_node *n;
102 atomic_t refcnt;
103 int read_s, write_s;
105 int need_exit;
107 int stall;
109 int __join_state;
111 struct dnet_addr addr;
113 int (* process)(struct dnet_net_state *st, struct epoll_event *ev);
115 struct dnet_cmd rcv_cmd;
116 uint64_t rcv_offset;
117 uint64_t rcv_end;
118 unsigned int rcv_flags;
119 void *rcv_data;
121 int epoll_fd;
122 size_t send_offset;
123 pthread_mutex_t send_lock;
124 struct list_head send_list;
126 pthread_mutex_t trans_lock;
127 struct rb_root trans_root;
128 struct list_head trans_list;
131 int la;
132 unsigned long long free;
133 float weight;
134 long median_read_time;
136 struct dnet_idc *idc;
138 struct dnet_stat_count stat[__DNET_CMD_MAX];
141 struct dnet_idc;
142 struct dnet_state_id {
143 struct dnet_raw_id raw;
144 struct dnet_idc *idc;
147 struct dnet_idc {
148 struct dnet_net_state *st;
149 struct dnet_group *group;
150 int id_num;
151 struct dnet_state_id ids[];
154 int dnet_idc_create(struct dnet_net_state *st, int group_id, struct dnet_raw_id *ids, int id_num);
155 void dnet_idc_destroy_nolock(struct dnet_net_state *st);
157 struct dnet_net_state *dnet_state_create(struct dnet_node *n,
158 int group_id, struct dnet_raw_id *ids, int id_num,
159 struct dnet_addr *addr, int s, int *errp, int join,
160 int (* process)(struct dnet_net_state *st, struct epoll_event *ev));
162 void dnet_state_reset(struct dnet_net_state *st);
163 void dnet_state_remove_nolock(struct dnet_net_state *st);
165 struct dnet_net_state *dnet_state_search_by_addr(struct dnet_node *n, struct dnet_addr *addr);
166 struct dnet_net_state *dnet_state_get_first(struct dnet_node *n, struct dnet_id *id);
167 struct dnet_net_state *dnet_state_search_nolock(struct dnet_node *n, struct dnet_id *id);
168 struct dnet_net_state *dnet_node_state(struct dnet_node *n);
170 void dnet_node_cleanup_common_resources(struct dnet_node *n);
172 int dnet_search_range(struct dnet_node *n, struct dnet_id *id,
173 struct dnet_raw_id *start, struct dnet_raw_id *next);
175 int dnet_recv_route_list(struct dnet_net_state *st);
177 void dnet_state_destroy(struct dnet_net_state *st);
179 void dnet_schedule_command(struct dnet_net_state *st);
181 int dnet_schedule_send(struct dnet_net_state *st);
182 int dnet_schedule_recv(struct dnet_net_state *st);
184 void dnet_unschedule_send(struct dnet_net_state *st);
185 void dnet_unschedule_recv(struct dnet_net_state *st);
187 int dnet_setup_control_nolock(struct dnet_net_state *st);
189 int dnet_add_reconnect_state(struct dnet_node *n, struct dnet_addr *addr, unsigned int join_state);
191 static inline struct dnet_net_state *dnet_state_get(struct dnet_net_state *st)
193 atomic_inc(&st->refcnt);
194 return st;
196 static inline void dnet_state_put(struct dnet_net_state *st)
199 * State can be NULL here when we just want to kick IO thread.
201 if (st && atomic_dec_and_test(&st->refcnt))
202 dnet_state_destroy(st);
205 struct dnet_wait
207 pthread_cond_t wait;
208 pthread_mutex_t wait_lock;
209 int cond;
210 int status;
212 void *ret;
213 int size;
215 atomic_t refcnt;
218 #define dnet_wait_event(w, condition, wts) \
219 ({ \
220 int __err = 0; \
221 struct timespec __ts; \
222 struct timeval __tv; \
223 gettimeofday(&__tv, NULL); \
224 __ts.tv_nsec = __tv.tv_usec * 1000 + (wts)->tv_nsec; \
225 __ts.tv_sec = __tv.tv_sec + (wts)->tv_sec; \
226 pthread_mutex_lock(&(w)->wait_lock); \
227 while (!(condition) && !__err) \
228 __err = pthread_cond_timedwait(&(w)->wait, &(w)->wait_lock, &__ts); \
229 pthread_mutex_unlock(&(w)->wait_lock); \
230 -__err; \
233 #define dnet_wakeup(w, task) \
234 ({ \
235 pthread_mutex_lock(&(w)->wait_lock); \
236 task; \
237 pthread_cond_broadcast(&(w)->wait); \
238 pthread_mutex_unlock(&(w)->wait_lock); \
241 struct dnet_wait *dnet_wait_alloc(int cond);
242 void dnet_wait_destroy(struct dnet_wait *w);
244 static inline struct dnet_wait *dnet_wait_get(struct dnet_wait *w)
246 atomic_inc(&w->refcnt);
247 return w;
250 static inline void dnet_wait_put(struct dnet_wait *w)
252 if (atomic_dec_and_test(&w->refcnt))
253 dnet_wait_destroy(w);
256 struct dnet_notify_bucket
258 struct list_head notify_list;
259 pthread_rwlock_t notify_lock;
262 int dnet_update_notify(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data);
264 int dnet_notify_add(struct dnet_net_state *st, struct dnet_cmd *cmd);
265 int dnet_notify_remove(struct dnet_net_state *st, struct dnet_cmd *cmd);
267 int dnet_notify_init(struct dnet_node *n);
268 void dnet_notify_exit(struct dnet_node *n);
270 struct dnet_group
272 struct list_head group_entry;
274 unsigned int group_id;
276 struct list_head state_list;
278 atomic_t refcnt;
280 int id_num;
281 struct dnet_state_id *ids;
284 static inline struct dnet_group *dnet_group_get(struct dnet_group *g)
286 atomic_inc(&g->refcnt);
287 return g;
290 void dnet_group_destroy(struct dnet_group *g);
291 static inline void dnet_group_put(struct dnet_group *g)
293 if (g && atomic_dec_and_test(&g->refcnt))
294 dnet_group_destroy(g);
297 struct dnet_transform
299 void *priv;
301 int (* transform)(void *priv, const void *src, uint64_t size,
302 void *dst, unsigned int *dsize, unsigned int flags);
305 int dnet_crypto_init(struct dnet_node *n, void *ns, int nsize);
306 void dnet_crypto_cleanup(struct dnet_node *n);
308 struct dnet_net_io {
309 int epoll_fd;
310 pthread_t tid;
311 struct dnet_node *n;
314 enum dnet_work_io_mode {
315 DNET_WORK_IO_MODE_BLOCKING = 0,
316 DNET_WORK_IO_MODE_NONBLOCKING,
317 DNET_WORK_IO_MODE_EXEC_BLOCKING,
320 struct dnet_work_pool;
321 struct dnet_work_io {
322 struct list_head wio_entry;
323 int thread_index;
324 pthread_t tid;
325 struct dnet_work_pool *pool;
328 struct dnet_work_pool {
329 struct dnet_node *n;
330 int mode;
331 int num;
332 atomic_t avail;
333 struct list_head list;
334 pthread_mutex_t lock;
335 pthread_cond_t wait;
336 struct list_head wio_list;
339 struct dnet_io {
340 int need_exit;
342 int net_thread_num, net_thread_pos;
343 struct dnet_net_io *net;
345 struct dnet_work_pool *recv_pool;
346 struct dnet_work_pool *recv_pool_nb;
349 int dnet_state_accept_process(struct dnet_net_state *st, struct epoll_event *ev);
350 int dnet_state_net_process(struct dnet_net_state *st, struct epoll_event *ev);
351 int dnet_io_init(struct dnet_node *n, struct dnet_config *cfg);
352 void dnet_io_exit(struct dnet_node *n);
354 void dnet_io_req_free(struct dnet_io_req *r);
356 struct dnet_locks {
357 int num;
358 pthread_mutex_t lock[0];
361 void dnet_locks_destroy(struct dnet_node *n);
362 int dnet_locks_init(struct dnet_node *n, int num);
363 void dnet_oplock(struct dnet_node *n, struct dnet_id *key);
364 void dnet_opunlock(struct dnet_node *n, struct dnet_id *key);
365 int dnet_optrylock(struct dnet_node *n, struct dnet_id *key);
367 struct dnet_node
369 struct list_head check_entry;
371 struct dnet_transform transform;
373 int need_exit;
375 int autodiscovery_socket;
376 struct dnet_addr autodiscovery_addr;
378 struct dnet_id id;
380 int flags;
381 int ro;
383 pthread_attr_t attr;
385 struct dnet_addr addr;
386 int sock_type, proto, family;
388 pthread_mutex_t state_lock;
389 struct list_head group_list;
391 /* hosts client states, i.e. those who didn't join network */
392 struct list_head empty_state_list;
394 /* hosts all states added to given node */
395 struct list_head storage_state_list;
397 atomic_t trans;
399 struct dnet_net_state *st;
401 int error;
403 struct dnet_log *log;
405 struct dnet_wait *wait;
406 struct timespec wait_ts;
408 struct dnet_io *io;
410 int check_in_progress;
411 long check_timeout;
412 pthread_t check_tid;
413 long stall_count;
415 pthread_t monitor_tid;
416 int monitor_fd;
418 char *temp_meta_env;
420 struct dnet_backend_callbacks *cb;
422 unsigned int notify_hash_size;
423 struct dnet_notify_bucket *notify_hash;
425 pthread_mutex_t reconnect_lock;
426 struct list_head reconnect_list;
428 struct dnet_lock counters_lock;
429 struct dnet_stat_count counters[__DNET_CNTR_MAX];
431 int bg_ionice_class;
432 int bg_ionice_prio;
433 int removal_delay;
435 char cookie[DNET_AUTH_COOKIE_SIZE];
437 void *srw;
439 int server_prio;
440 int client_prio;
442 struct dnet_locks *locks;
444 size_t cache_size;
445 void *cache;
449 struct dnet_session {
450 struct dnet_node *node;
451 int group_num;
452 int *groups;
455 static inline int dnet_counter_init(struct dnet_node *n)
457 memset(&n->counters, 0, __DNET_CNTR_MAX * sizeof(struct dnet_stat_count));
458 return dnet_lock_init(&n->counters_lock);
461 static inline void dnet_counter_destroy(struct dnet_node *n)
463 return dnet_lock_destroy(&n->counters_lock);
466 static inline void dnet_counter_inc(struct dnet_node *n, int counter, int err)
468 if (counter >= __DNET_CNTR_MAX)
469 counter = DNET_CNTR_UNKNOWN;
471 dnet_lock_lock(&n->counters_lock);
472 if (!err)
473 n->counters[counter].count++;
474 else
475 n->counters[counter].err++;
476 dnet_lock_unlock(&n->counters_lock);
478 dnet_log(n, DNET_LOG_DEBUG, "Incrementing counter: %d, err: %d, value is: %llu %llu.\n",
479 counter, err,
480 (unsigned long long)n->counters[counter].count,
481 (unsigned long long)n->counters[counter].err);
484 static inline void dnet_counter_set(struct dnet_node *n, int counter, int err, int64_t val)
486 if (counter >= __DNET_CNTR_MAX)
487 counter = DNET_CNTR_UNKNOWN;
489 dnet_lock_lock(&n->counters_lock);
490 if (!err)
491 n->counters[counter].count = val;
492 else
493 n->counters[counter].err = val;
494 dnet_lock_unlock(&n->counters_lock);
497 static inline char *dnet_dump_node(struct dnet_node *n)
499 static char buf[128];
501 return dnet_server_convert_dnet_addr_raw(&n->addr, buf, sizeof(buf));
504 struct dnet_trans;
505 int __attribute__((weak)) dnet_process_cmd_raw(struct dnet_net_state *st, struct dnet_cmd *cmd, void *data);
506 int dnet_process_recv(struct dnet_net_state *st, struct dnet_io_req *r);
508 int dnet_recv(struct dnet_net_state *st, void *data, unsigned int size);
509 int dnet_sendfile(struct dnet_net_state *st, int fd, uint64_t *offset, uint64_t size);
511 int dnet_send_request(struct dnet_net_state *st, struct dnet_io_req *r);
513 struct dnet_config;
514 int dnet_socket_create(struct dnet_node *n, struct dnet_config *cfg, struct dnet_addr *addr, int listening);
515 int dnet_socket_create_addr(struct dnet_node *n, int sock_type, int proto, int family,
516 struct sockaddr *sa, unsigned int salen, int listening);
518 void dnet_set_sockopt(int s);
519 void dnet_sock_close(int s);
521 enum dnet_join_state {
522 DNET_JOIN = 1, /* Node joined the network */
523 DNET_WANT_RECONNECT, /* State must be reconnected, when remote peer failed */
526 int __attribute__((weak)) dnet_state_join_nolock(struct dnet_net_state *st);
528 struct dnet_trans
530 struct rb_node trans_entry;
531 struct list_head trans_list_entry;
533 struct timeval time, start;
535 struct dnet_net_state *orig; /* only for forward */
537 struct dnet_net_state *st;
538 uint64_t trans, rcv_trans;
539 struct dnet_cmd cmd;
541 atomic_t refcnt;
543 int command; /* main command this transaction carries */
545 void *priv;
546 int (* complete)(struct dnet_net_state *st,
547 struct dnet_cmd *cmd,
548 void *priv);
551 void dnet_trans_destroy(struct dnet_trans *t);
552 struct dnet_trans *dnet_trans_alloc(struct dnet_node *n, uint64_t size);
553 int dnet_trans_alloc_send_state(struct dnet_net_state *st, struct dnet_trans_control *ctl);
554 int dnet_trans_timer_setup(struct dnet_trans *t);
556 static inline struct dnet_trans *dnet_trans_get(struct dnet_trans *t)
558 atomic_inc(&t->refcnt);
559 return t;
562 static inline void dnet_trans_put(struct dnet_trans *t)
564 if (t && atomic_dec_and_test(&t->refcnt))
565 dnet_trans_destroy(t);
568 int dnet_trans_insert_nolock(struct rb_root *root, struct dnet_trans *a);
569 void dnet_trans_remove(struct dnet_trans *t);
570 void dnet_trans_remove_nolock(struct rb_root *root, struct dnet_trans *t);
571 struct dnet_trans *dnet_trans_search(struct rb_root *root, uint64_t trans);
573 int dnet_trans_send(struct dnet_trans *t, struct dnet_io_req *req);
575 int dnet_recv_list(struct dnet_node *n, struct dnet_net_state *st);
577 ssize_t dnet_send_fd(struct dnet_net_state *st, void *header, uint64_t hsize,
578 int fd, uint64_t offset, uint64_t dsize, int close_on_exit);
579 ssize_t dnet_send_data(struct dnet_net_state *st, void *header, uint64_t hsize, void *data, uint64_t dsize);
580 ssize_t dnet_send(struct dnet_net_state *st, void *data, uint64_t size);
581 ssize_t dnet_send_nolock(struct dnet_net_state *st, void *data, uint64_t size);
583 struct dnet_io_completion
585 struct dnet_wait *wait;
586 char *file;
587 uint64_t offset;
590 struct dnet_addr_storage
592 int reconnect_time, reconnect_time_max;
593 struct list_head reconnect_entry;
594 struct dnet_addr addr;
595 unsigned int __join_state;
599 * Returns true if t1 is before than t2.
601 static inline int dnet_time_before(struct timespec *t1, struct timespec *t2)
603 if ((long)(t1->tv_sec - t2->tv_sec) < 0)
604 return 1;
606 if ((long)(t2->tv_sec - t1->tv_sec) < 0)
607 return 0;
609 return ((long)(t1->tv_nsec - t2->tv_nsec) < 0);
611 #define dnet_time_after(t2, t1) dnet_time_before(t1, t2)
613 int dnet_check_thread_start(struct dnet_node *n);
614 void dnet_check_thread_stop(struct dnet_node *n);
615 int dnet_try_reconnect(struct dnet_node *n);
617 #define DNET_CHECK_TYPE_COPIES_HISTORY 1
618 #define DNET_CHECK_TYPE_COPIES_FULL 2
619 #define DNET_CHECK_TYPE_MERGE 3
620 #define DNET_CHECK_TYPE_DELETE 4
622 #define DNET_BULK_IDS_SIZE 1000
623 #define DNET_BULK_CHECK_PING 100
624 #define DNET_BULK_STATES_ALLOC_STEP 10
625 #define DNET_BULK_META_UPD_SIZE 1000
627 struct dnet_bulk_id
629 struct dnet_raw_id id;
630 struct dnet_meta_update last_update;
631 } __attribute__ ((packed));
633 struct dnet_bulk_state
635 struct dnet_addr addr;
636 pthread_mutex_t state_lock;
637 int num;
638 struct dnet_bulk_id *ids;
641 struct dnet_bulk_array
643 int num;
644 struct dnet_bulk_state *states;
645 atomic_t refcnt;
648 static inline int dnet_compare_bulk_state(const void *k1, const void *k2)
650 const struct dnet_bulk_state *st1 = (const struct dnet_bulk_state *)k1;
651 const struct dnet_bulk_state *st2 = (const struct dnet_bulk_state *)k2;
653 if (st1->addr.addr_len > st2->addr.addr_len)
654 return 1;
655 if (st1->addr.addr_len < st2->addr.addr_len)
656 return -1;
657 return memcmp(st1->addr.addr, st2->addr.addr, st1->addr.addr_len);
660 struct dnet_check_temp_db {
661 struct eblob_backend *b;
662 struct eblob_log log;
663 atomic_t refcnt;
666 struct dnet_check_params {
667 struct dnet_check_temp_db *db;
668 int group_num;
669 int *groups;
673 static inline struct dnet_check_temp_db * dnet_check_temp_db_get(struct dnet_check_temp_db *db) {
674 atomic_inc(&db->refcnt);
675 return db;
678 static inline void dnet_check_temp_db_put(struct dnet_check_temp_db *db) {
679 if (atomic_dec_and_test(&db->refcnt)) {
680 eblob_remove_blobs(db->b);
681 eblob_cleanup(db->b);
682 free(db);
686 int dnet_check(struct dnet_node *n, struct dnet_meta_container *mc, struct dnet_bulk_array *bulk_array, int check_copies, struct dnet_check_params *params);
687 int dnet_db_list(struct dnet_net_state *st, struct dnet_cmd *cmd);
688 int dnet_cmd_bulk_check(struct dnet_net_state *orig, struct dnet_cmd *cmd, void *data);
689 int dnet_request_bulk_check(struct dnet_node *n, struct dnet_bulk_state *state, struct dnet_check_params *params);
691 struct dnet_meta_update * dnet_get_meta_update(struct dnet_node *n, struct dnet_meta_container *mc,
692 struct dnet_meta_update *meta_update);
694 int dnet_update_ts_metadata(struct eblob_backend *b, struct dnet_raw_id *id, uint64_t flags_set, uint64_t flags_clear);
695 int dnet_update_ts_metadata_raw(struct dnet_meta_container *mc, uint64_t flags_set, uint64_t flags_clear);
697 int dnet_process_meta(struct dnet_net_state *st, struct dnet_cmd *cmd, struct dnet_io_attr *io);
698 void dnet_convert_metadata(struct dnet_node *n __unused, void *data, int size);
700 void dnet_monitor_exit(struct dnet_node *n);
701 int dnet_monitor_init(struct dnet_node *n, struct dnet_config *cfg);
703 int dnet_set_name(char *name);
704 int dnet_ioprio_set(long pid, int class_id, int prio);
705 int dnet_ioprio_get(long pid);
707 struct dnet_map_fd {
708 int fd;
709 uint64_t offset, size;
711 void *data;
713 uint64_t mapped_size;
714 void *mapped_data;
717 int dnet_data_map(struct dnet_map_fd *map);
718 void dnet_data_unmap(struct dnet_map_fd *map);
720 void *dnet_read_data_wait_raw(struct dnet_session *s, struct dnet_id *id, struct dnet_io_attr *io, int cmd, uint64_t cflags, int *errp);
722 int dnet_srw_init(struct dnet_node *n, struct dnet_config *cfg);
723 void dnet_srw_cleanup(struct dnet_node *n);
724 int dnet_cmd_exec_raw(struct dnet_net_state *st, struct dnet_cmd *cmd, struct sph *header, const void *data);
726 int dnet_cache_init(struct dnet_node *n);
727 void dnet_cache_cleanup(struct dnet_node *n);
728 int dnet_cmd_cache_io(struct dnet_net_state *st, struct dnet_cmd *cmd, struct dnet_io_attr *io, char *data);
730 int __attribute__((weak)) dnet_remove_local(struct dnet_node *n, struct dnet_id *id);
732 int dnet_discovery(struct dnet_node *n);
734 #ifdef __cplusplus
736 #endif
738 #endif /* __DNET_ELLIPTICS_H */