Elliptics version update: 2.19.2.7
[elliptics.git] / example / leveldb_backend.c
blob6e6fb183043afe0fc173f146ad652b0cfd8e8290
1 /*
2 * 2012+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <errno.h>
23 #include <ctype.h>
24 #include <dirent.h>
25 #include <fcntl.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
31 #include "elliptics/packet.h"
32 #include "elliptics/interface.h"
34 #include "backends.h"
35 #include "common.h"
37 #include <leveldb/c.h>
39 #ifndef __unused
40 #define __unused __attribute__ ((unused))
41 #endif
43 struct leveldb_backend
45 int sync;
46 struct eblob_log elog;
47 struct eblob_backend *meta;
49 size_t write_buffer_size;
50 size_t block_size;
51 int block_restart_interval;
52 int max_open_files;
53 int compression;
54 char *path;
55 char *log;
57 leveldb_env_t *env;
58 leveldb_cache_t *cache;
59 leveldb_options_t *options;
60 leveldb_readoptions_t *roptions;
61 leveldb_writeoptions_t *woptions;
62 leveldb_comparator_t *cmp;
64 leveldb_t *db;
68 static int leveldb_backend_lookup_raw(struct leveldb_backend *s, struct index *idx, void *state, struct dnet_cmd *cmd)
70 int err, fd;
71 char *path;
73 err = smack_lookup(s->smack, idx, &path);
74 if (err < 0)
75 goto err_out_exit;
77 fd = open(path, O_RDONLY | O_CLOEXEC);
78 if (fd < 0) {
79 err = -errno;
80 dnet_backend_log(DNET_LOG_ERROR, "%s: SMACK: %s: lookup-open: size: %llu: %s %d.\n",
81 dnet_dump_id_str(idx->id), path,
82 (unsigned long long)idx->data_size,
83 strerror(-err), err);
84 goto err_out_free;
87 err = dnet_send_file_info(state, cmd, fd, 0, idx->data_size);
88 if (err)
89 goto err_out_close;
91 dnet_backend_log(DNET_LOG_INFO, "%s: SMACK: %s: lookup: size: %llu.\n",
92 dnet_dump_id(&cmd->id), path, (unsigned long long)idx->data_size);
94 err_out_close:
95 close(fd);
96 err_out_free:
97 free(path);
98 err_out_exit:
99 return err;
103 static int leveldb_backend_lookup(struct leveldb_backend *s, void *state, struct dnet_cmd *cmd)
106 struct index idx;
108 smack_setup_idx(&idx, cmd->id.id);
109 return leveldb_backend_lookup_raw(s, &idx, state, cmd);
111 return 0;
114 static int leveldb_backend_write(struct leveldb_backend *s, void *state, struct dnet_cmd *cmd, void *data)
116 struct dnet_node *n = dnet_get_node_from_state(state);
117 int err = -2;
118 char *errp = NULL;
119 struct dnet_io_attr *io = data;
120 struct dnet_file_info *info;
121 struct dnet_addr_attr *a;
123 dnet_convert_io_attr(io);
125 data += sizeof(struct dnet_io_attr);
127 leveldb_put(s->db, s->woptions, (const char *)cmd->id.id, DNET_ID_SIZE, data, io->size, &errp);
128 if (errp)
129 goto err_out_exit;
131 a = malloc(sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info));
132 if (!a) {
133 err = -ENOMEM;
134 goto err_out_exit;
136 info = (struct dnet_file_info *)(a + 1);
138 dnet_fill_addr_attr(n, a);
139 dnet_convert_addr_attr(a);
141 memset(info, 0, sizeof(struct dnet_file_info));
142 dnet_convert_file_info(info);
144 err = dnet_send_reply(state, cmd, a, sizeof(struct dnet_addr_attr) + sizeof(struct dnet_file_info), 0);
146 dnet_backend_log(DNET_LOG_NOTICE, "%s: LEVELDB: : WRITE: Ok: offset: %llu, size: %llu.\n",
147 dnet_dump_id(&cmd->id), (unsigned long long)io->offset, (unsigned long long)io->size);
149 return err;
150 err_out_exit:
151 dnet_backend_log(DNET_LOG_ERROR, "%s: LEVELDB: : WRITE: error: %s.\n",
152 dnet_dump_id(&cmd->id), errp);
153 return err;
156 static int leveldb_backend_read(struct leveldb_backend *s, void *state, struct dnet_cmd *cmd, void *iodata)
158 struct dnet_io_attr *io = iodata;
159 char *data;
160 size_t data_size;
161 int err = -1;
162 char *errp = NULL;
164 dnet_convert_io_attr(io);
166 data = leveldb_get(s->db, s->roptions, (const char *)io->id, DNET_ID_SIZE, &data_size, &errp);
167 if (errp)
168 goto err_out_exit;
170 io->size = data_size;
171 err = dnet_send_read_data(state, cmd, io, data, -1, io->offset, 0);
172 if (err)
173 goto err_out_free;
175 err_out_free:
176 free(data);
177 err_out_exit:
178 if (err < 0)
179 dnet_backend_log(DNET_LOG_ERROR, "%s: LEVELDB: READ: error: %s\n",
180 dnet_dump_id(&cmd->id), errp);
181 return err;
184 static int leveldb_backend_remove(struct leveldb_backend *s, void *state __unused, struct dnet_cmd *cmd, void *data __unused)
186 char *errp = NULL;
188 leveldb_delete(s->db, s->woptions, (const char *)cmd->id.id, DNET_ID_SIZE, &errp);
189 if (errp) {
190 dnet_backend_log(DNET_LOG_ERROR, "%s: LEVELDB: REMOVE: error: %s",
191 dnet_dump_id(&cmd->id), errp);
192 return -2;
195 return 0;
199 static int leveldb_backend_bulk_read(struct leveldb_backend *s, void *state, struct dnet_cmd *cmd, void *data)
201 int err = -1, ret;
202 struct dnet_io_attr *io = data;
203 struct dnet_io_attr *ios = io+1;
204 uint64_t count = 0;
205 uint64_t i;
207 dnet_convert_io_attr(io);
208 count = io->size / sizeof(struct dnet_io_attr);
210 for (i = 0; i < count; i++) {
211 ret = leveldb_backend_read(s, state, cmd, &ios[i]);
212 if (!ret)
213 err = 0;
214 else if (err == -1)
215 err = ret;
218 return err;
222 static int leveldb_backend_command_handler(void *state, void *priv, struct dnet_cmd *cmd, void *data)
224 int err;
225 struct leveldb_backend *s = priv;
227 switch (cmd->cmd) {
228 case DNET_CMD_LOOKUP:
229 err = leveldb_backend_lookup(s, state, cmd);
230 break;
231 case DNET_CMD_WRITE:
232 err = leveldb_backend_write(s, state, cmd, data);
233 break;
234 case DNET_CMD_READ:
235 err = leveldb_backend_read(s, state, cmd, data);
236 break;
237 case DNET_CMD_STAT:
238 err = backend_stat(state, s->path, cmd);
239 break;
240 case DNET_CMD_DEL:
241 err = leveldb_backend_remove(s, state, cmd, data);
242 break;
243 // case DNET_CMD_BULK_READ:
244 // err = leveldb_backend_bulk_read(s, state, cmd, data);
245 // break;
246 // case DNET_CMD_READ_RANGE:
247 // err = -ENOTSUP;
248 // break;
249 default:
250 err = -ENOTSUP;
251 break;
254 return err;
257 static int dnet_leveldb_set_cache_size(struct dnet_config_backend *b, char *key __unused, char *value)
259 struct leveldb_backend *s = b->data;
261 s->cache = leveldb_cache_create_lru(atol(value));
262 return 0;
265 static int dnet_leveldb_set_write_buffer_size(struct dnet_config_backend *b, char *key __unused, char *value)
267 struct leveldb_backend *s = b->data;
269 s->write_buffer_size = atol(value);
270 return 0;
273 static int dnet_leveldb_set_block_size(struct dnet_config_backend *b, char *key __unused, char *value)
275 struct leveldb_backend *s = b->data;
277 s->block_size = atol(value);
278 return 0;
281 static int dnet_leveldb_set_block_restart_interval(struct dnet_config_backend *b, char *key __unused, char *value)
283 struct leveldb_backend *s = b->data;
285 s->block_restart_interval = atoi(value);
286 return 0;
289 static int dnet_leveldb_set_max_open_files(struct dnet_config_backend *b, char *key __unused, char *value)
291 struct leveldb_backend *s = b->data;
293 s->max_open_files = atoi(value);
294 return 0;
297 static int dnet_leveldb_set_sync(struct dnet_config_backend *b, char *key __unused, char *value)
299 struct leveldb_backend *s = b->data;
301 s->sync = atoi(value);
302 return 0;
305 static int dnet_leveldb_set_compression(struct dnet_config_backend *b, char *key __unused, char *value)
307 struct leveldb_backend *s = b->data;
309 if (!strcmp(value, "snappy"))
310 s->compression = leveldb_snappy_compression;
312 return 0;
315 static int dnet_leveldb_set_log(struct dnet_config_backend *b, char *key __unused, char *value)
317 struct leveldb_backend *s = b->data;
318 char *tmp;
320 tmp = strdup(value);
321 if (!tmp)
322 return -ENOMEM;
324 if (s->log)
325 free(s->log);
326 s->log = tmp;
327 return 0;
330 static int dnet_leveldb_set_root(struct dnet_config_backend *b, char *key __unused, char *root)
332 struct leveldb_backend *s = b->data;
333 int err;
335 err = backend_storage_size(b, root);
336 if (err)
337 goto err_out_exit;
339 s->path = strdup(root);
340 if (!s->path) {
341 err = -ENOMEM;
342 goto err_out_exit;
345 return 0;
347 err_out_exit:
348 return err;
352 static int leveldb_backend_send(void *state, void *priv, struct dnet_id *id)
354 struct dnet_node *n = dnet_get_node_from_state(state);
355 struct leveldb_backend *s = priv;
356 char *result = NULL;
357 char *data;
358 int err;
360 smack_setup_idx(&idx, id->id);
361 err = smack_read(s->smack, &idx, &data);
362 if (err)
363 goto err_out_exit;
365 struct dnet_io_control ctl;
367 memset(&ctl, 0, sizeof(ctl));
369 ctl.fd = -1;
371 ctl.data = data;
373 memcpy(&ctl.id, id, sizeof(struct dnet_id));
375 ctl.io.offset = 0;
376 ctl.io.size = idx.data_size;
377 ctl.io.type = 0;
378 ctl.io.flags = 0;
380 struct dnet_session *sess = dnet_session_create(n);
381 dnet_session_set_groups(sess, (int *)&id->group_id, 1);
383 err = dnet_write_data_wait(sess, &ctl, (void **)&result);
384 if (err < 0)
385 goto err_out_free;
386 free(result);
387 err = 0;
389 err_out_free:
390 free(data);
391 err_out_exit:
392 return err;
395 int leveldb_backend_storage_stat(void *priv, struct dnet_stat *st)
397 int err;
398 struct leveldb_backend *s = priv;
400 memset(st, 0, sizeof(struct dnet_stat));
402 err = backend_stat_low_level(s->path ? s->path : ".", st);
403 if (err)
404 return err;
406 return 0;
409 static void dnet_leveldb_db_cleanup(struct leveldb_backend *s)
411 eblob_cleanup(s->meta);
414 static int dnet_leveldb_db_init(struct leveldb_backend *s, struct dnet_config *c, const char *path)
416 static char meta_path[300];
417 struct eblob_config ecfg;
418 int err = 0;
420 snprintf(meta_path, sizeof(meta_path), "%s/meta", path);
422 memset(&ecfg, 0, sizeof(ecfg));
423 ecfg.file = meta_path;
424 ecfg.sync = 300;
425 ecfg.blob_flags = EBLOB_RESERVE_10_PERCENTS | EBLOB_TRY_OVERWRITE | EBLOB_NO_FOOTER;
426 ecfg.blob_size = 10LLU*1024*1024;
427 ecfg.defrag_percentage = 25;
428 ecfg.defrag_timeout = 3600;
429 ecfg.log = (struct eblob_log *)c->log;
431 s->meta = eblob_init(&ecfg);
432 if (!s->meta) {
433 err = -EINVAL;
434 dnet_backend_log(DNET_LOG_ERROR, "Failed to initialize metadata eblob\n");
437 return err;
440 static void leveldb_backend_cleanup(void *priv)
442 struct leveldb_backend *s = priv;
444 leveldb_close(s->db);
445 leveldb_options_destroy(s->options);
446 leveldb_readoptions_destroy(s->roptions);
447 leveldb_writeoptions_destroy(s->woptions);
448 leveldb_cache_destroy(s->cache);
449 //leveldb_comparator_destroy(s->cmp);
450 leveldb_env_destroy(s->env);
452 dnet_leveldb_db_cleanup(s);
453 free(s->path);
456 static ssize_t dnet_leveldb_db_read(void *priv, struct dnet_raw_id *id, void **datap)
458 struct leveldb_backend *s = priv;
459 return dnet_db_read_raw(s->meta, id, datap);
462 static int dnet_leveldb_db_write(void *priv, struct dnet_raw_id *id, void *data, size_t size)
464 struct leveldb_backend *s = priv;
465 return dnet_db_write_raw(s->meta, id, data, size);
468 static int dnet_leveldb_db_remove(void *priv, struct dnet_raw_id *id, int real_del)
470 struct leveldb_backend *s = priv;
471 return dnet_db_remove_raw(s->meta, id, real_del);
474 static int dnet_leveldb_db_iterate(struct dnet_iterate_ctl *ctl)
476 struct leveldb_backend *s = ctl->iterate_private;
477 return dnet_db_iterate(s->meta, ctl);
480 static long long smack_total_elements(void *priv)
482 struct leveldb_backend *s = priv;
483 char *prop;
484 char propname[256];
485 int level = 0;
486 long long count = 0;
488 do {
489 snprintf(propname, sizeof(propname), "leveldb.num-files-at-level%d", level);
490 prop = leveldb_property_value(s->db, propname);
491 if (prop) {
492 dnet_backend_log(DNET_LOG_DEBUG, "LEVELDB: properties: %s -> %s\n", propname, prop);
493 count += atoi(prop);
495 level++;
496 } while (prop);
498 dnet_backend_log(DNET_LOG_DEBUG, "LEVELDB: count: %lld\n", count);
500 return count;
503 static int dnet_leveldb_config_init(struct dnet_config_backend *b, struct dnet_config *c)
505 struct leveldb_backend *s = b->data;
506 int err;
507 char *errp = NULL;
509 c->cb = &b->cb;
511 b->cb.command_private = s;
513 b->cb.command_handler = leveldb_backend_command_handler;
514 //b->cb.send = leveldb_backend_send;
516 c->storage_size = b->storage_size;
517 c->storage_free = b->storage_free;
519 b->cb.storage_stat = leveldb_backend_storage_stat;
520 b->cb.backend_cleanup = leveldb_backend_cleanup;
522 b->cb.meta_read = dnet_leveldb_db_read;
523 b->cb.meta_write = dnet_leveldb_db_write;
524 b->cb.meta_remove = dnet_leveldb_db_remove;
525 b->cb.meta_total_elements = smack_total_elements;
526 b->cb.meta_iterate = dnet_leveldb_db_iterate;
528 mkdir("history", 0755);
529 err = dnet_leveldb_db_init(s, c, "history");
530 if (err)
531 goto err_out_exit;
533 //s->cmp = leveldb_comparator_create(NULL, CmpDestroy, CmpCompare, CmpName);
534 s->env = leveldb_create_default_env();
536 s->options = leveldb_options_create();
537 //leveldb_options_set_comparator(s->options, s->cmp);
538 leveldb_options_set_create_if_missing(s->options, 1);
539 leveldb_options_set_cache(s->options, s->cache);
540 leveldb_options_set_env(s->options, s->env);
541 leveldb_options_set_info_log(s->options, NULL);
542 leveldb_options_set_write_buffer_size(s->options, s->write_buffer_size);
543 leveldb_options_set_paranoid_checks(s->options, 1);
544 leveldb_options_set_max_open_files(s->options, s->max_open_files);
545 leveldb_options_set_block_size(s->options, s->block_size);
546 leveldb_options_set_block_restart_interval(s->options, s->block_restart_interval);
547 leveldb_options_set_compression(s->options, leveldb_no_compression);
549 s->roptions = leveldb_readoptions_create();
550 leveldb_readoptions_set_verify_checksums(s->roptions, 1);
551 leveldb_readoptions_set_fill_cache(s->roptions, 1);
553 s->woptions = leveldb_writeoptions_create();
554 leveldb_writeoptions_set_sync(s->woptions, s->sync);
556 s->db = leveldb_open(s->options, s->path, &errp);
557 if (!s->db || errp)
558 goto err_out_cleanup;
560 return 0;
562 err_out_cleanup:
563 dnet_leveldb_db_cleanup(s);
564 err_out_exit:
565 return err;
568 static void dnet_leveldb_config_cleanup(struct dnet_config_backend *b)
570 struct leveldb_backend *s = b->data;
572 leveldb_backend_cleanup(s);
575 static struct dnet_config_entry dnet_cfg_entries_leveldb[] = {
576 {"log", dnet_leveldb_set_log},
577 {"sync", dnet_leveldb_set_sync},
578 {"root", dnet_leveldb_set_root},
579 {"cache_size", dnet_leveldb_set_cache_size},
580 {"write_buffer_size", dnet_leveldb_set_write_buffer_size},
581 {"block_size", dnet_leveldb_set_block_size},
582 {"block_restart_interval", dnet_leveldb_set_block_restart_interval},
583 {"max_open_files", dnet_leveldb_set_max_open_files},
584 {"compression", dnet_leveldb_set_compression},
585 // {"", dnet_leveldb_set_},
588 static struct dnet_config_backend dnet_leveldb_backend = {
589 .name = "leveldb",
590 .ent = dnet_cfg_entries_leveldb,
591 .num = ARRAY_SIZE(dnet_cfg_entries_leveldb),
592 .size = sizeof(struct leveldb_backend),
593 .init = dnet_leveldb_config_init,
594 .cleanup = dnet_leveldb_config_cleanup,
597 int dnet_leveldb_backend_init(void)
599 return dnet_backend_register(&dnet_leveldb_backend);
602 void dnet_leveldb_backend_exit(void)
604 /* cleanup routing will be called explicitly through backend->cleanup() callback */