2 * 2012+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
37 #include <leveldb/c.h>
40 #define __unused __attribute__ ((unused))
43 struct leveldb_backend
46 struct eblob_log elog
;
47 struct eblob_backend
*meta
;
49 size_t write_buffer_size
;
51 int block_restart_interval
;
58 leveldb_cache_t
*cache
;
59 leveldb_options_t
*options
;
60 leveldb_readoptions_t
*roptions
;
61 leveldb_writeoptions_t
*woptions
;
62 leveldb_comparator_t
*cmp
;
68 static int leveldb_backend_lookup_raw(struct leveldb_backend *s, struct index *idx, void *state, struct dnet_cmd *cmd)
73 err = smack_lookup(s->smack, idx, &path);
77 fd = open(path, O_RDONLY | O_CLOEXEC);
80 dnet_backend_log(DNET_LOG_ERROR, "%s: SMACK: %s: lookup-open: size: %llu: %s %d.\n",
81 dnet_dump_id_str(idx->id), path,
82 (unsigned long long)idx->data_size,
87 err = dnet_send_file_info(state, cmd, fd, 0, idx->data_size);
91 dnet_backend_log(DNET_LOG_INFO, "%s: SMACK: %s: lookup: size: %llu.\n",
92 dnet_dump_id(&cmd->id), path, (unsigned long long)idx->data_size);
103 static int leveldb_backend_lookup(struct leveldb_backend
*s
, void *state
, struct dnet_cmd
*cmd
)
108 smack_setup_idx(&idx, cmd->id.id);
109 return leveldb_backend_lookup_raw(s, &idx, state, cmd);
114 static int leveldb_backend_write(struct leveldb_backend
*s
, void *state
, struct dnet_cmd
*cmd
, void *data
)
116 struct dnet_node
*n
= dnet_get_node_from_state(state
);
119 struct dnet_io_attr
*io
= data
;
120 struct dnet_file_info
*info
;
121 struct dnet_addr_attr
*a
;
123 dnet_convert_io_attr(io
);
125 data
+= sizeof(struct dnet_io_attr
);
127 leveldb_put(s
->db
, s
->woptions
, (const char *)cmd
->id
.id
, DNET_ID_SIZE
, data
, io
->size
, &errp
);
131 a
= malloc(sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
));
136 info
= (struct dnet_file_info
*)(a
+ 1);
138 dnet_fill_addr_attr(n
, a
);
139 dnet_convert_addr_attr(a
);
141 memset(info
, 0, sizeof(struct dnet_file_info
));
142 dnet_convert_file_info(info
);
144 err
= dnet_send_reply(state
, cmd
, a
, sizeof(struct dnet_addr_attr
) + sizeof(struct dnet_file_info
), 0);
146 dnet_backend_log(DNET_LOG_NOTICE
, "%s: LEVELDB: : WRITE: Ok: offset: %llu, size: %llu.\n",
147 dnet_dump_id(&cmd
->id
), (unsigned long long)io
->offset
, (unsigned long long)io
->size
);
151 dnet_backend_log(DNET_LOG_ERROR
, "%s: LEVELDB: : WRITE: error: %s.\n",
152 dnet_dump_id(&cmd
->id
), errp
);
156 static int leveldb_backend_read(struct leveldb_backend
*s
, void *state
, struct dnet_cmd
*cmd
, void *iodata
)
158 struct dnet_io_attr
*io
= iodata
;
164 dnet_convert_io_attr(io
);
166 data
= leveldb_get(s
->db
, s
->roptions
, (const char *)io
->id
, DNET_ID_SIZE
, &data_size
, &errp
);
170 io
->size
= data_size
;
171 err
= dnet_send_read_data(state
, cmd
, io
, data
, -1, io
->offset
, 0);
179 dnet_backend_log(DNET_LOG_ERROR
, "%s: LEVELDB: READ: error: %s\n",
180 dnet_dump_id(&cmd
->id
), errp
);
184 static int leveldb_backend_remove(struct leveldb_backend
*s
, void *state __unused
, struct dnet_cmd
*cmd
, void *data __unused
)
188 leveldb_delete(s
->db
, s
->woptions
, (const char *)cmd
->id
.id
, DNET_ID_SIZE
, &errp
);
190 dnet_backend_log(DNET_LOG_ERROR
, "%s: LEVELDB: REMOVE: error: %s",
191 dnet_dump_id(&cmd
->id
), errp
);
199 static int leveldb_backend_bulk_read(struct leveldb_backend *s, void *state, struct dnet_cmd *cmd, void *data)
202 struct dnet_io_attr *io = data;
203 struct dnet_io_attr *ios = io+1;
207 dnet_convert_io_attr(io);
208 count = io->size / sizeof(struct dnet_io_attr);
210 for (i = 0; i < count; i++) {
211 ret = leveldb_backend_read(s, state, cmd, &ios[i]);
222 static int leveldb_backend_command_handler(void *state
, void *priv
, struct dnet_cmd
*cmd
, void *data
)
225 struct leveldb_backend
*s
= priv
;
228 case DNET_CMD_LOOKUP
:
229 err
= leveldb_backend_lookup(s
, state
, cmd
);
232 err
= leveldb_backend_write(s
, state
, cmd
, data
);
235 err
= leveldb_backend_read(s
, state
, cmd
, data
);
238 err
= backend_stat(state
, s
->path
, cmd
);
241 err
= leveldb_backend_remove(s
, state
, cmd
, data
);
243 // case DNET_CMD_BULK_READ:
244 // err = leveldb_backend_bulk_read(s, state, cmd, data);
246 // case DNET_CMD_READ_RANGE:
257 static int dnet_leveldb_set_cache_size(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
259 struct leveldb_backend
*s
= b
->data
;
261 s
->cache
= leveldb_cache_create_lru(atol(value
));
265 static int dnet_leveldb_set_write_buffer_size(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
267 struct leveldb_backend
*s
= b
->data
;
269 s
->write_buffer_size
= atol(value
);
273 static int dnet_leveldb_set_block_size(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
275 struct leveldb_backend
*s
= b
->data
;
277 s
->block_size
= atol(value
);
281 static int dnet_leveldb_set_block_restart_interval(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
283 struct leveldb_backend
*s
= b
->data
;
285 s
->block_restart_interval
= atoi(value
);
289 static int dnet_leveldb_set_max_open_files(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
291 struct leveldb_backend
*s
= b
->data
;
293 s
->max_open_files
= atoi(value
);
297 static int dnet_leveldb_set_sync(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
299 struct leveldb_backend
*s
= b
->data
;
301 s
->sync
= atoi(value
);
305 static int dnet_leveldb_set_compression(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
307 struct leveldb_backend
*s
= b
->data
;
309 if (!strcmp(value
, "snappy"))
310 s
->compression
= leveldb_snappy_compression
;
315 static int dnet_leveldb_set_log(struct dnet_config_backend
*b
, char *key __unused
, char *value
)
317 struct leveldb_backend
*s
= b
->data
;
330 static int dnet_leveldb_set_root(struct dnet_config_backend
*b
, char *key __unused
, char *root
)
332 struct leveldb_backend
*s
= b
->data
;
335 err
= backend_storage_size(b
, root
);
339 s
->path
= strdup(root
);
352 static int leveldb_backend_send(void *state, void *priv, struct dnet_id *id)
354 struct dnet_node *n = dnet_get_node_from_state(state);
355 struct leveldb_backend *s = priv;
360 smack_setup_idx(&idx, id->id);
361 err = smack_read(s->smack, &idx, &data);
365 struct dnet_io_control ctl;
367 memset(&ctl, 0, sizeof(ctl));
373 memcpy(&ctl.id, id, sizeof(struct dnet_id));
376 ctl.io.size = idx.data_size;
380 struct dnet_session *sess = dnet_session_create(n);
381 dnet_session_set_groups(sess, (int *)&id->group_id, 1);
383 err = dnet_write_data_wait(sess, &ctl, (void **)&result);
395 int leveldb_backend_storage_stat(void *priv
, struct dnet_stat
*st
)
398 struct leveldb_backend
*s
= priv
;
400 memset(st
, 0, sizeof(struct dnet_stat
));
402 err
= backend_stat_low_level(s
->path
? s
->path
: ".", st
);
409 static void dnet_leveldb_db_cleanup(struct leveldb_backend
*s
)
411 eblob_cleanup(s
->meta
);
414 static int dnet_leveldb_db_init(struct leveldb_backend
*s
, struct dnet_config
*c
, const char *path
)
416 static char meta_path
[300];
417 struct eblob_config ecfg
;
420 snprintf(meta_path
, sizeof(meta_path
), "%s/meta", path
);
422 memset(&ecfg
, 0, sizeof(ecfg
));
423 ecfg
.file
= meta_path
;
425 ecfg
.blob_flags
= EBLOB_RESERVE_10_PERCENTS
| EBLOB_TRY_OVERWRITE
| EBLOB_NO_FOOTER
;
426 ecfg
.blob_size
= 10LLU*1024*1024;
427 ecfg
.defrag_percentage
= 25;
428 ecfg
.defrag_timeout
= 3600;
429 ecfg
.log
= (struct eblob_log
*)c
->log
;
431 s
->meta
= eblob_init(&ecfg
);
434 dnet_backend_log(DNET_LOG_ERROR
, "Failed to initialize metadata eblob\n");
440 static void leveldb_backend_cleanup(void *priv
)
442 struct leveldb_backend
*s
= priv
;
444 leveldb_close(s
->db
);
445 leveldb_options_destroy(s
->options
);
446 leveldb_readoptions_destroy(s
->roptions
);
447 leveldb_writeoptions_destroy(s
->woptions
);
448 leveldb_cache_destroy(s
->cache
);
449 //leveldb_comparator_destroy(s->cmp);
450 leveldb_env_destroy(s
->env
);
452 dnet_leveldb_db_cleanup(s
);
456 static ssize_t
dnet_leveldb_db_read(void *priv
, struct dnet_raw_id
*id
, void **datap
)
458 struct leveldb_backend
*s
= priv
;
459 return dnet_db_read_raw(s
->meta
, id
, datap
);
462 static int dnet_leveldb_db_write(void *priv
, struct dnet_raw_id
*id
, void *data
, size_t size
)
464 struct leveldb_backend
*s
= priv
;
465 return dnet_db_write_raw(s
->meta
, id
, data
, size
);
468 static int dnet_leveldb_db_remove(void *priv
, struct dnet_raw_id
*id
, int real_del
)
470 struct leveldb_backend
*s
= priv
;
471 return dnet_db_remove_raw(s
->meta
, id
, real_del
);
474 static int dnet_leveldb_db_iterate(struct dnet_iterate_ctl
*ctl
)
476 struct leveldb_backend
*s
= ctl
->iterate_private
;
477 return dnet_db_iterate(s
->meta
, ctl
);
480 static long long smack_total_elements(void *priv
)
482 struct leveldb_backend
*s
= priv
;
489 snprintf(propname
, sizeof(propname
), "leveldb.num-files-at-level%d", level
);
490 prop
= leveldb_property_value(s
->db
, propname
);
492 dnet_backend_log(DNET_LOG_DEBUG
, "LEVELDB: properties: %s -> %s\n", propname
, prop
);
498 dnet_backend_log(DNET_LOG_DEBUG
, "LEVELDB: count: %lld\n", count
);
503 static int dnet_leveldb_config_init(struct dnet_config_backend
*b
, struct dnet_config
*c
)
505 struct leveldb_backend
*s
= b
->data
;
511 b
->cb
.command_private
= s
;
513 b
->cb
.command_handler
= leveldb_backend_command_handler
;
514 //b->cb.send = leveldb_backend_send;
516 c
->storage_size
= b
->storage_size
;
517 c
->storage_free
= b
->storage_free
;
519 b
->cb
.storage_stat
= leveldb_backend_storage_stat
;
520 b
->cb
.backend_cleanup
= leveldb_backend_cleanup
;
522 b
->cb
.meta_read
= dnet_leveldb_db_read
;
523 b
->cb
.meta_write
= dnet_leveldb_db_write
;
524 b
->cb
.meta_remove
= dnet_leveldb_db_remove
;
525 b
->cb
.meta_total_elements
= smack_total_elements
;
526 b
->cb
.meta_iterate
= dnet_leveldb_db_iterate
;
528 mkdir("history", 0755);
529 err
= dnet_leveldb_db_init(s
, c
, "history");
533 //s->cmp = leveldb_comparator_create(NULL, CmpDestroy, CmpCompare, CmpName);
534 s
->env
= leveldb_create_default_env();
536 s
->options
= leveldb_options_create();
537 //leveldb_options_set_comparator(s->options, s->cmp);
538 leveldb_options_set_create_if_missing(s
->options
, 1);
539 leveldb_options_set_cache(s
->options
, s
->cache
);
540 leveldb_options_set_env(s
->options
, s
->env
);
541 leveldb_options_set_info_log(s
->options
, NULL
);
542 leveldb_options_set_write_buffer_size(s
->options
, s
->write_buffer_size
);
543 leveldb_options_set_paranoid_checks(s
->options
, 1);
544 leveldb_options_set_max_open_files(s
->options
, s
->max_open_files
);
545 leveldb_options_set_block_size(s
->options
, s
->block_size
);
546 leveldb_options_set_block_restart_interval(s
->options
, s
->block_restart_interval
);
547 leveldb_options_set_compression(s
->options
, leveldb_no_compression
);
549 s
->roptions
= leveldb_readoptions_create();
550 leveldb_readoptions_set_verify_checksums(s
->roptions
, 1);
551 leveldb_readoptions_set_fill_cache(s
->roptions
, 1);
553 s
->woptions
= leveldb_writeoptions_create();
554 leveldb_writeoptions_set_sync(s
->woptions
, s
->sync
);
556 s
->db
= leveldb_open(s
->options
, s
->path
, &errp
);
558 goto err_out_cleanup
;
563 dnet_leveldb_db_cleanup(s
);
568 static void dnet_leveldb_config_cleanup(struct dnet_config_backend
*b
)
570 struct leveldb_backend
*s
= b
->data
;
572 leveldb_backend_cleanup(s
);
575 static struct dnet_config_entry dnet_cfg_entries_leveldb
[] = {
576 {"log", dnet_leveldb_set_log
},
577 {"sync", dnet_leveldb_set_sync
},
578 {"root", dnet_leveldb_set_root
},
579 {"cache_size", dnet_leveldb_set_cache_size
},
580 {"write_buffer_size", dnet_leveldb_set_write_buffer_size
},
581 {"block_size", dnet_leveldb_set_block_size
},
582 {"block_restart_interval", dnet_leveldb_set_block_restart_interval
},
583 {"max_open_files", dnet_leveldb_set_max_open_files
},
584 {"compression", dnet_leveldb_set_compression
},
585 // {"", dnet_leveldb_set_},
588 static struct dnet_config_backend dnet_leveldb_backend
= {
590 .ent
= dnet_cfg_entries_leveldb
,
591 .num
= ARRAY_SIZE(dnet_cfg_entries_leveldb
),
592 .size
= sizeof(struct leveldb_backend
),
593 .init
= dnet_leveldb_config_init
,
594 .cleanup
= dnet_leveldb_config_cleanup
,
597 int dnet_leveldb_backend_init(void)
599 return dnet_backend_register(&dnet_leveldb_backend
);
602 void dnet_leveldb_backend_exit(void)
604 /* cleanup routing will be called explicitly through backend->cleanup() callback */