Elliptics version update: 2.19.2.8
[elliptics.git] / bindings / cpp / node.cpp
bloba8b2da8c7388575d09476589fbb7493723bdf251
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #define _XOPEN_SOURCE 600
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/mman.h>
22 #include <sys/wait.h>
24 #include <errno.h>
25 #include <ctype.h>
26 #include <dirent.h>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
34 #include <algorithm>
35 #include <iostream>
36 #include <stdexcept>
37 #include <string>
38 #include <sstream>
39 #include <vector>
41 #include <boost/algorithm/string.hpp>
42 #include <elliptics/cppdef.h>
44 using namespace ioremap::elliptics;
46 node::node(logger &l) : m_node(NULL), m_log(NULL)
48 struct dnet_config cfg;
50 memset(&cfg, 0, sizeof(cfg));
52 cfg.sock_type = SOCK_STREAM;
53 cfg.proto = IPPROTO_TCP;
54 cfg.wait_timeout = 5;
55 cfg.check_timeout = 20;
57 m_log = reinterpret_cast<logger *>(l.clone());
58 cfg.log = m_log->get_dnet_log();
60 snprintf(cfg.addr, sizeof(cfg.addr), "0.0.0.0");
61 snprintf(cfg.port, sizeof(cfg.port), "0");
63 m_node = dnet_node_create(&cfg);
64 if (!m_node) {
65 delete m_log;
66 throw std::bad_alloc();
70 node::node(logger &l, struct dnet_config &cfg) : m_node(NULL), m_log(NULL)
72 cfg.sock_type = SOCK_STREAM;
73 cfg.proto = IPPROTO_TCP;
75 m_log = reinterpret_cast<logger *>(l.clone());
76 cfg.log = m_log->get_dnet_log();
78 snprintf(cfg.addr, sizeof(cfg.addr), "0.0.0.0");
79 snprintf(cfg.port, sizeof(cfg.port), "0");
81 m_node = dnet_node_create(&cfg);
82 if (!m_node) {
83 delete m_log;
84 throw std::bad_alloc();
88 node::node(logger &l, const std::string &config_path) : m_node(NULL), m_log(NULL)
90 struct dnet_config cfg;
91 memset(&cfg, 0, sizeof(struct dnet_config));
93 cfg.sock_type = SOCK_STREAM;
94 cfg.proto = IPPROTO_TCP;
96 m_log = reinterpret_cast<logger *>(l.clone());
97 cfg.log = m_log->get_dnet_log();
99 std::list<addr_tuple> remotes;
100 std::vector<int> groups;
102 parse_config(config_path, cfg, remotes, groups, cfg.log->log_level);
104 m_node = dnet_node_create(&cfg);
105 if (!m_node) {
106 delete m_log;
107 throw std::bad_alloc();
110 for (std::list<addr_tuple>::iterator it = remotes.begin(); it != remotes.end(); ++it) {
111 try {
112 add_remote(it->host.c_str(), it->port, it->family);
113 } catch (...) {
114 continue;
119 node::~node()
121 dnet_node_destroy(m_node);
122 delete m_log;
125 session::session(node &n) : m_node(&n)
127 m_session = dnet_session_create(m_node->m_node);
129 if (!m_session)
130 throw std::bad_alloc();
133 session::~session()
137 void node::parse_config(const std::string &path, struct dnet_config &cfg,
138 std::list<addr_tuple> &remotes,
139 std::vector<int> &groups,
140 int &log_level)
142 std::ifstream in(path.c_str());
143 std::string line;
144 int line_num = 0;
146 while (!in.eof() && in.good()) {
147 line.resize(1024);
149 in.getline((char *)line.data(), line.size());
150 size_t len = in.gcount();
152 line.resize(len);
154 if (in.eof() || !in.good())
155 break;
157 boost::trim(line);
158 line_num++;
160 if (line.size() < 3 || line.data()[0] == '#')
161 continue;
163 std::vector<std::string> strs;
164 boost::split(strs, line, boost::is_any_of("="));
166 std::string key = strs[0];
167 boost::trim(key);
169 if (strs.size() != 2) {
170 std::ostringstream str;
171 str << path << ": invalid elliptics config: line: " << line_num <<
172 ", key: " << key << "': string is broken: size: " << strs.size();
173 throw std::runtime_error(str.str());
175 std::string value = strs[1];
176 boost::trim(value);
178 if (key == "remote") {
179 std::vector<std::string> rem;
180 boost::split(rem, value, boost::is_any_of(" "));
182 for (std::vector<std::string>::iterator it = rem.begin(); it != rem.end(); ++it) {
183 std::string addr_str = *it;
184 if (dnet_parse_addr((char *)addr_str.c_str(), &cfg)) {
185 std::ostringstream str;
186 str << path << ": invalid elliptics config: '" << key << "' ";
187 str << path << ": invalid elliptics config: line: " << line_num <<
188 ", key: '" << key << "': remote addr is invalid";
189 throw std::runtime_error(str.str());
192 addr_tuple addr(cfg.addr, atoi(cfg.port), cfg.family);
193 remotes.push_back(addr);
197 if (key == "groups") {
198 std::vector<std::string> gr;
199 boost::split(gr, value, boost::is_any_of(":"));
201 for (std::vector<std::string>::iterator it = gr.begin(); it != gr.end(); ++it) {
202 int group = atoi(it->c_str());
204 if (group != 0)
205 groups.push_back(group);
209 if (key == "check_timeout")
210 cfg.check_timeout = strtoul(value.c_str(), NULL, 0);
211 if (key == "wait_timeout")
212 cfg.wait_timeout = strtoul(value.c_str(), NULL, 0);
213 if (key == "log_level")
214 log_level = strtoul(value.c_str(), NULL, 0);
218 void node::add_remote(const char *addr, const int port, const int family)
220 struct dnet_config cfg;
221 int err;
223 memset(&cfg, 0, sizeof(cfg));
225 cfg.family = family;
226 snprintf(cfg.addr, sizeof(cfg.addr), "%s", addr);
227 snprintf(cfg.port, sizeof(cfg.port), "%d", port);
229 err = dnet_add_state(m_node, &cfg);
230 if (err) {
231 std::ostringstream str;
232 str << "Failed to add remote addr " << addr << ":" << port << ": " << err;
233 throw std::runtime_error(str.str());
237 void node::set_timeouts(const int wait_timeout, const int check_timeout)
239 dnet_set_timeouts(m_node, wait_timeout, check_timeout);
242 void session::add_groups(std::vector<int> &groups)
244 if (dnet_session_set_groups(m_session, (int *)&groups[0], groups.size()))
245 throw std::bad_alloc();
246 this->groups = groups;
249 void session::read_file(struct dnet_id &id, const std::string &file, uint64_t offset, uint64_t size)
251 int err;
253 err = dnet_read_file_id(m_session, file.c_str(), &id, offset, size);
254 if (err) {
255 std::ostringstream str;
256 str << dnet_dump_id(&id) << ": READ: " << file << ": offset: " << offset << ", size: " << size << ": " << err;
257 throw std::runtime_error(str.str());
261 void session::read_file(const std::string &remote, const std::string &file, uint64_t offset, uint64_t size, int type)
263 int err;
265 err = dnet_read_file(m_session, file.c_str(), remote.data(), remote.size(), offset, size, type);
266 if (err) {
267 struct dnet_id id;
268 transform(remote, id);
269 id.type = 0;
271 std::ostringstream str;
272 str << dnet_dump_id(&id) << ": READ: " << file << ": offset: " << offset << ", size: " << size << ": " << err;
273 throw std::runtime_error(str.str());
277 void session::write_file(struct dnet_id &id, const std::string &file, uint64_t local_offset,
278 uint64_t offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
280 int err = dnet_write_file_id(m_session, file.c_str(), &id, local_offset, offset, size, cflags, ioflags);
281 if (err) {
282 std::ostringstream str;
283 str << dnet_dump_id(&id) << ": WRITE: " << file << ", local_offset: " << local_offset <<
284 ", offset: " << offset << ", size: " << size << ": " << err;
285 throw std::runtime_error(str.str());
288 void session::write_file(const std::string &remote, const std::string &file, uint64_t local_offset, uint64_t offset, uint64_t size,
289 uint64_t cflags, unsigned int ioflags, int type)
291 int err = dnet_write_file(m_session, file.c_str(), remote.data(), remote.size(),
292 local_offset, offset, size, cflags, ioflags, type);
293 if (err) {
294 struct dnet_id id;
295 transform(remote, id);
296 id.type = 0;
298 std::ostringstream str;
299 str << dnet_dump_id(&id) << ": WRITE: " << file << ", local_offset: " << local_offset <<
300 ", offset: " << offset << ", size: " << size << ": " << err;
301 throw std::runtime_error(str.str());
305 std::string session::read_data_wait(struct dnet_id &id, uint64_t offset, uint64_t size,
306 uint64_t cflags, uint32_t ioflags)
308 struct dnet_io_attr io;
309 int err;
311 memset(&io, 0, sizeof(io));
312 io.size = size;
313 io.offset = offset;
314 io.flags = ioflags;
315 io.type = id.type;
317 memcpy(io.id, id.id, DNET_ID_SIZE);
318 memcpy(io.parent, id.id, DNET_ID_SIZE);
320 void *data = dnet_read_data_wait(m_session, &id, &io, cflags, &err);
321 if (!data) {
322 std::ostringstream str;
323 str << dnet_dump_id(&id) << ": READ: size: " << size << ": err: " << strerror(-err) << ": " << err;
324 throw std::runtime_error(str.str());
327 std::string ret = std::string((const char *)data + sizeof(struct dnet_io_attr), io.size - sizeof(struct dnet_io_attr));
328 free(data);
330 return ret;
333 std::string session::read_data_wait(const std::string &remote, uint64_t offset, uint64_t size,
334 uint64_t cflags, uint32_t ioflags, int type)
336 struct dnet_id id;
338 transform(remote, id);
339 id.type = type;
341 return read_data_wait(id, offset, size, cflags, ioflags);
344 void session::prepare_latest(struct dnet_id &id, uint64_t cflags, std::vector<int> &groups)
346 struct dnet_read_latest_prepare pr;
347 int err;
349 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
351 pr.s = m_session;
352 pr.id = id;
353 pr.cflags = cflags;
355 pr.group = (int *)malloc(groups.size() * sizeof(int));
356 if (!pr.group) {
357 std::ostringstream str;
359 str << dnet_dump_id(&id) << ": prepare_latest: allocation failure: group num: " << groups.size();
360 throw std::runtime_error(str.str());
362 pr.group_num = groups.size();
364 for (unsigned i = 0; i < groups.size(); ++i)
365 pr.group[i] = groups[i];
367 err = dnet_read_latest_prepare(&pr);
368 if (!err) {
369 try {
370 groups.clear();
372 for (int i = 0; i < pr.group_num; ++i)
373 groups.push_back(pr.group[i]);
374 } catch (...) {
375 free(pr.group);
376 throw;
380 free(pr.group);
382 if (!groups.size())
383 err = -ENOENT;
385 if (err) {
386 std::ostringstream str;
388 str << dnet_dump_id(&id) << ": prepare_latest: groups: " << groups.size() << ": err: " << strerror(-err) << ": " << err;
389 throw std::runtime_error(str.str());
393 std::string session::read_latest(struct dnet_id &id, uint64_t offset, uint64_t size,
394 uint64_t cflags, uint32_t ioflags)
396 struct dnet_io_attr io;
397 void *data;
398 int err;
400 memset(&io, 0, sizeof(io));
401 io.size = size;
402 io.offset = offset;
403 io.flags = ioflags;
404 io.type = id.type;
405 io.num = groups.size();
407 memcpy(io.id, id.id, DNET_ID_SIZE);
408 memcpy(io.parent, id.id, DNET_ID_SIZE);
410 err = dnet_read_latest(m_session, &id, &io, cflags, &data);
411 if (err < 0) {
412 std::ostringstream str;
413 str << dnet_dump_id(&id) << ": READ: size: " << size << ": err: " << strerror(-err) << ": " << err;
414 throw std::runtime_error(str.str());
417 std::string ret = std::string((const char *)data + sizeof(struct dnet_io_attr), io.size - sizeof(struct dnet_io_attr));
418 free(data);
420 return ret;
423 std::string session::read_latest(const std::string &remote, uint64_t offset, uint64_t size,
424 uint64_t cflags, uint32_t ioflags, int type)
426 struct dnet_id id;
428 transform(remote, id);
429 id.type = type;
431 return read_latest(id, offset, size, cflags, ioflags);
434 std::string session::write_cache(struct dnet_id &id, const std::string &str,
435 uint64_t cflags, unsigned int ioflags, long timeout)
437 struct dnet_io_control ctl;
439 memset(&ctl, 0, sizeof(ctl));
441 ctl.cflags = cflags;
442 ctl.data = str.data();
444 ctl.io.flags = ioflags | DNET_IO_FLAGS_CACHE;
445 ctl.io.start = timeout;
446 ctl.io.size = str.size();
447 ctl.io.type = id.type;
448 ctl.io.num = str.size();
450 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
452 ctl.fd = -1;
454 char *result = NULL;
455 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
456 if (err < 0) {
457 std::ostringstream string;
458 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
459 throw std::runtime_error(string.str());
462 std::string ret((const char *)result, err);
463 free(result);
465 return ret;
468 std::string session::write_cache(const std::string &key, const std::string &str,
469 uint64_t cflags, unsigned int ioflags, long timeout)
471 struct dnet_id id;
473 transform(key, id);
474 id.type = 0;
475 id.group_id = 0;
477 return write_cache(id, str, cflags, ioflags, timeout);
480 std::string session::write_compare_and_swap(const struct dnet_id &id, const std::string &str,
481 const dnet_id &old_csum, uint64_t remote_offset, uint64_t cflags, unsigned int ioflags) {
482 struct dnet_io_control ctl;
484 memset(&ctl, 0, sizeof(ctl));
486 ctl.cflags = cflags;
487 ctl.data = str.data();
489 ctl.io.flags = ioflags | DNET_IO_FLAGS_COMPARE_AND_SWAP;
490 ctl.io.offset = remote_offset;
491 ctl.io.size = str.size();
492 ctl.io.type = id.type;
493 ctl.io.num = str.size() + remote_offset;
495 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
496 memcpy(&ctl.io.parent, &old_csum.id, DNET_ID_SIZE);
498 ctl.fd = -1;
500 char *result = NULL;
501 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
502 if (err < 0) {
503 std::ostringstream string;
504 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
505 throw std::runtime_error(string.str());
508 std::string ret((const char *)result, err);
509 free(result);
511 return ret;
514 std::string session::write_compare_and_swap(const std::string &remote, const std::string &str, const struct dnet_id &old_csum,
515 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags, int type)
517 struct dnet_id id;
519 transform(remote, id);
520 id.type = type;
521 id.group_id = 0;
523 return write_compare_and_swap(id, str, old_csum, remote_offset, cflags, ioflags);
526 std::string session::write_data_wait(struct dnet_id &id, const std::string &str,
527 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags)
529 struct dnet_io_control ctl;
531 memset(&ctl, 0, sizeof(ctl));
533 ctl.cflags = cflags;
534 ctl.data = str.data();
536 ctl.io.flags = ioflags;
537 ctl.io.offset = remote_offset;
538 ctl.io.size = str.size();
539 ctl.io.type = id.type;
540 ctl.io.num = str.size() + remote_offset;
542 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
544 ctl.fd = -1;
546 char *result = NULL;
547 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
548 if (err < 0) {
549 std::ostringstream string;
550 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
551 throw std::runtime_error(string.str());
554 std::string ret((const char *)result, err);
555 free(result);
557 return ret;
560 std::string session::write_data_wait(const std::string &remote, const std::string &str,
561 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags, int type)
563 struct dnet_id id;
565 transform(remote, id);
566 id.type = type;
567 id.group_id = 0;
569 return write_data_wait(id, str, remote_offset, cflags, ioflags);
572 std::string session::lookup_addr(const std::string &remote, const int group_id)
574 char buf[128];
576 int err = dnet_lookup_addr(m_session, remote.data(), remote.size(), NULL, group_id, buf, sizeof(buf));
577 if (err < 0) {
578 std::ostringstream str;
579 str << "Failed to lookup in group " << group_id << ": key size: " << remote.size() << ", err: " << err;
580 throw std::runtime_error(str.str());
583 return std::string((const char *)buf, strlen(buf));
586 std::string session::lookup_addr(const struct dnet_id &id)
588 char buf[128];
590 int err = dnet_lookup_addr(m_session, NULL, 0, (struct dnet_id *)&id, id.group_id, buf, sizeof(buf));
591 if (err < 0) {
592 std::ostringstream str;
593 str << "Failed to lookup " << dnet_dump_id(&id) << ": err: " << err;
594 throw std::runtime_error(str.str());
597 return std::string((const char *)buf, strlen(buf));
600 std::string session::create_metadata(const struct dnet_id &id, const std::string &obj,
601 const std::vector<int> &groups, const struct timespec &ts)
603 struct dnet_metadata_control ctl;
604 struct dnet_meta_container mc;
605 int err;
607 memset(&mc, 0, sizeof(struct dnet_meta_container));
608 memset(&ctl, 0, sizeof(struct dnet_metadata_control));
610 ctl.obj = (char *)obj.data();
611 ctl.len = obj.size();
613 ctl.groups = (int *)&groups[0];
614 ctl.group_num = groups.size();
616 ctl.ts = ts;
617 ctl.id = id;
619 err = dnet_create_metadata(m_session, &ctl, &mc);
620 if (err) {
621 std::ostringstream str;
622 str << "Failed to create metadata: key: " << dnet_dump_id(&id) << ", err: " << err;
623 throw std::runtime_error(str.str());
626 std::string ret;
628 try {
629 ret.assign((char *)mc.data, mc.size);
630 } catch (...) {
631 free(mc.data);
632 throw;
635 free(mc.data);
636 return ret;
639 int session::write_metadata(const struct dnet_id &id, const std::string &obj,
640 const std::vector<int> &groups, const struct timespec &ts, uint64_t cflags)
642 int err;
643 std::string meta;
644 struct dnet_meta_container mc;
646 if (dnet_flags(m_node->m_node) & DNET_CFG_NO_META)
647 return 0;
649 meta = create_metadata(id, obj, groups, ts);
651 mc.data = (void *)meta.data();
652 mc.size = meta.size();
654 mc.id = id;
656 err = dnet_write_metadata(m_session, &mc, 1, cflags);
657 if (err) {
658 std::ostringstream str;
659 str << "Failed to write metadata: key: " << dnet_dump_id(&id) << ", err: " << err;
660 throw std::runtime_error(str.str());
663 return 0;
666 void session::transform(const std::string &data, struct dnet_id &id)
668 dnet_transform(m_node->m_node, (void *)data.data(), data.size(), &id);
671 void session::lookup(const struct dnet_id &id, const callback &c)
673 int err = dnet_lookup_object(m_session, (struct dnet_id *)&id, 0,
674 callback::complete_callback,
675 (void *)&c);
677 if (err) {
678 std::ostringstream str;
679 str << "Failed to lookup ID " << dnet_dump_id(&id) << ": " << err;
680 throw std::runtime_error(str.str());
684 void session::lookup(const std::string &data, const callback &c)
686 struct dnet_id id;
687 int error = -ENOENT, i, num, *g;
689 transform(data, id);
690 id.type = 0;
692 num = dnet_mix_states(m_session, &id, &g);
693 if (num < 0)
694 throw std::bad_alloc();
696 for (i=0; i<num; ++i) {
697 id.group_id = g[i];
699 try {
700 lookup(id, c);
701 } catch (...) {
702 continue;
705 error = 0;
706 break;
709 free(g);
711 if (error) {
712 std::ostringstream str;
713 str << "Failed to lookup data object: key: " << dnet_dump_id(&id);
714 throw std::runtime_error(str.str());
718 std::string session::lookup(const std::string &data)
720 struct dnet_id id;
721 int error = -ENOENT, i, num, *g;
722 std::string ret;
724 transform(data, id);
725 id.type = 0;
727 num = dnet_mix_states(m_session, &id, &g);
728 if (num < 0)
729 throw std::bad_alloc();
731 for (i=0; i<num; ++i) {
732 try {
733 callback l;
734 id.group_id = g[i];
736 lookup(id, l);
737 ret = l.wait();
739 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd)) {
740 std::stringstream str;
742 str << dnet_dump_id(&id) << ": failed to receive lookup request";
743 throw std::runtime_error(str.str());
745 /* reply parsing examaple */
746 #if 0
747 struct dnet_addr *addr = (struct dnet_addr *)ret.data();
748 struct dnet_cmd *cmd = (struct dnet_cmd *)(addr + 1);
750 if (cmd->size > sizeof(struct dnet_addr_attr)) {
751 struct dnet_addr_attr *a = (struct dnet_addr_attr *)(cmd + 1);
752 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
754 dnet_convert_addr_attr(a);
755 dnet_convert_file_info(info);
757 #endif
758 dnet_log_raw(m_node->m_node, DNET_LOG_DEBUG, "%s: %s: %zu bytes\n", dnet_dump_id(&id), data.c_str(), ret.size());
759 error = 0;
760 break;
761 } catch (const std::exception &e) {
762 dnet_log_raw(m_node->m_node, DNET_LOG_ERROR, "%s: %s : %s\n", dnet_dump_id(&id), e.what(), data.c_str());
763 continue;
767 free(g);
769 if (error) {
770 std::ostringstream str;
771 str << data << ": could not find object";
773 throw std::runtime_error(str.str());
776 return ret;
779 std::string session::lookup(const struct dnet_id &id)
781 int error = -ENOENT;
782 std::string ret;
784 try {
785 callback l;
787 lookup(id, l);
788 ret = l.wait();
790 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd)) {
791 std::stringstream str;
793 str << dnet_dump_id(&id) << ": failed to receive lookup request";
794 throw std::runtime_error(str.str());
797 dnet_log_raw(m_node->m_node, DNET_LOG_DEBUG, "%s: %zu bytes\n", dnet_dump_id(&id), ret.size());
798 error = 0;
799 } catch (const std::exception &e) {
800 dnet_log_raw(m_node->m_node, DNET_LOG_ERROR, "%s: %s\n", dnet_dump_id(&id), e.what());
803 if (error) {
804 std::ostringstream str;
805 str << dnet_dump_id(&id) << ": could not find object";
807 throw std::runtime_error(str.str());
810 return ret;
813 void session::remove_raw(struct dnet_id &id, uint64_t cflags, uint64_t ioflags)
815 int err = -ENOENT;
816 std::vector<int> g = groups;
818 for (int i=0; i<(int)g.size(); ++i) {
819 id.group_id = g[i];
821 if (!dnet_remove_object_now(m_session, &id, cflags, ioflags))
822 err = 0;
825 if (err) {
826 std::ostringstream str;
827 str << dnet_dump_id(&id) << ": REMOVE: " << err;
828 throw std::runtime_error(str.str());
832 void session::remove(struct dnet_id &id)
834 remove_raw(id, 0, 0);
837 void session::remove_raw(const std::string &data, int type, uint64_t cflags, uint64_t ioflags)
839 struct dnet_id id;
841 transform(data, id);
842 id.type = type;
844 remove_raw(id, cflags, ioflags);
847 void session::remove(const std::string &data, int type)
849 remove_raw(data, type, 0, 0);
852 std::string session::stat_log()
854 callback c;
855 std::string ret;
856 int err;
858 err = dnet_request_stat(m_session, NULL, DNET_CMD_STAT, 0,
859 callback::complete_callback, (void *)&c);
860 if (err < 0) {
861 std::ostringstream str;
862 str << "Failed to request statistics: " << err;
863 throw std::runtime_error(str.str());
866 ret = c.wait(err);
868 /* example reply parsing */
869 #if 0
870 float la[3];
871 const void *data = ret.data();
872 int size = ret.size();
873 char id_str[DNET_ID_SIZE*2 + 1];
874 char addr_str[128];
876 while (size) {
877 struct dnet_addr *addr = (struct dnet_addr *)data;
878 struct dnet_cmd *cmd = (struct dnet_cmd *)(addr + 1);
879 struct dnet_stat *st = (struct dnet_stat *)(cmd + 1);
881 dnet_convert_stat(st);
883 la[0] = (float)st->la[0] / 100.0;
884 la[1] = (float)st->la[1] / 100.0;
885 la[2] = (float)st->la[2] / 100.0;
887 printf("<stat addr=\"%s\" id=\"%s\"><la>%.2f %.2f %.2f</la>"
888 "<memtotal>%llu KB</memtotal><memfree>%llu KB</memfree><memcached>%llu KB</memcached>"
889 "<storage_size>%llu MB</storage_size><available_size>%llu MB</available_size>"
890 "<files>%llu</files><fsid>0x%llx</fsid></stat>",
891 dnet_server_convert_dnet_addr_raw(addr, addr_str, sizeof(addr_str)),
892 dnet_dump_id_len_raw(cmd->id.id, DNET_ID_SIZE, id_str),
893 la[0], la[1], la[2],
894 (unsigned long long)st->vm_total,
895 (unsigned long long)st->vm_free,
896 (unsigned long long)st->vm_cached,
897 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
898 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
899 (unsigned long long)st->files, (unsigned long long)st->fsid);
900 printf("\n");
902 int sz = sizeof(*addr) + sizeof(*cmd) + cmd->size;
904 size -= sz;
905 data += sz;
907 #endif
909 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd) + sizeof(struct dnet_stat))
910 throw std::runtime_error("Failed to request statistics: not enough data returned");
911 return ret;
914 int session::state_num(void)
916 return dnet_state_num(m_session);
919 int session::request_cmd(struct dnet_trans_control &ctl)
921 int err;
923 err = dnet_request_cmd(m_session, &ctl);
924 if (err < 0) {
925 std::ostringstream str;
926 str << dnet_dump_id(&ctl.id) << ": failed to request cmd: " << dnet_cmd_string(ctl.cmd) << ": " << err;
927 throw std::runtime_error(str.str());
930 return err;
933 void session::update_status(const char *saddr, const int port, const int family, struct dnet_node_status *status)
935 int err;
936 struct dnet_addr addr;
937 char sport[16];
939 memset(&addr, 0, sizeof(addr));
940 addr.addr_len = sizeof(addr.addr);
942 snprintf(sport, sizeof(sport), "%d", port);
944 err = dnet_fill_addr(&addr, saddr, sport, family, SOCK_STREAM, IPPROTO_TCP);
945 if (!err)
946 err = dnet_update_status(m_session, &addr, NULL, status);
948 if (err < 0) {
949 std::ostringstream str;
950 str << saddr << ":" << port << ": failed to request set status " << std::hex << status << ": " << err;
951 throw std::runtime_error(str.str());
955 void session::update_status(struct dnet_id &id, struct dnet_node_status *status)
957 int err;
959 err = dnet_update_status(m_session, NULL, &id, status);
960 if (err < 0) {
961 std::ostringstream str;
963 str << dnet_dump_id(&id) << ": failed to request set status " << std::hex << status << ": " << err;
964 throw std::runtime_error(str.str());
968 struct range_sort_compare {
969 bool operator () (const std::string &s1, const std::string &s2) {
970 unsigned char *id1 = (unsigned char *)s1.data();
971 unsigned char *id2 = (unsigned char *)s2.data();
973 int cmp = dnet_id_cmp_str(id1, id2);
975 return cmp < 0;
979 std::vector<std::string> session::read_data_range(struct dnet_io_attr &io, int group_id, uint64_t cflags)
981 struct dnet_range_data *data;
982 uint64_t num = 0;
983 uint32_t ioflags = io.flags;
984 int err;
986 data = dnet_read_range(m_session, &io, group_id, cflags, &err);
987 if (!data && err) {
988 std::ostringstream str;
989 str << "Failed to read range data object: group: " << group_id <<
990 ", key: " << dnet_dump_id_str(io.id) <<
991 ", size: " << io.size << ": err: " << strerror(-err) << ": " << err;
992 throw std::runtime_error(str.str());
995 std::vector<std::string> ret;
997 if (data) {
998 try {
999 for (int i = 0; i < err; ++i) {
1000 struct dnet_range_data *d = &data[i];
1001 char *data = (char *)d->data;
1003 if (!(ioflags & DNET_IO_FLAGS_NODATA)) {
1004 while (d->size > sizeof(struct dnet_io_attr)) {
1005 struct dnet_io_attr *io = (struct dnet_io_attr *)data;
1007 dnet_convert_io_attr(io);
1009 std::string str;
1011 if (sizeof(struct dnet_io_attr) + io->size > d->size)
1013 std::ostringstream str;
1014 str << "read_data_range: incorrect data size: d->size = "
1015 << d->size << " io->size = "
1016 << io->size;
1017 throw std::runtime_error(str.str());
1020 str.append((char *)io->id, DNET_ID_SIZE);
1021 str.append((char *)&io->size, sizeof(io->size));
1022 str.append((const char *)(io + 1), io->size);
1024 ret.push_back(str);
1026 data += sizeof(struct dnet_io_attr) + io->size;
1027 d->size -= sizeof(struct dnet_io_attr) + io->size;
1029 } else {
1030 if (d->size != sizeof(struct dnet_io_attr)) {
1031 std::ostringstream str;
1032 str << "Incorrect data size: d->size = " << d->size <<
1033 "sizeof = " << sizeof(struct dnet_io_attr);
1034 throw std::runtime_error(str.str());
1036 struct dnet_io_attr *rep = (struct dnet_io_attr *)data;
1037 num += rep->num;
1040 for (int i = 0; i < err; ++i) {
1041 struct dnet_range_data *d = &data[i];
1042 free(d->data);
1044 free(data);
1045 } catch (const std::exception & e) {
1046 for (int i = 0; i < err; ++i) {
1047 struct dnet_range_data *d = &data[i];
1048 free(d->data);
1050 free(data);
1053 if (ioflags & DNET_IO_FLAGS_NODATA) {
1054 std::ostringstream str;
1055 str << num;
1056 ret.push_back(str.str());
1060 return ret;
1063 std::vector<struct dnet_io_attr> session::remove_data_range(struct dnet_io_attr &io, int group_id, uint64_t cflags)
1065 struct dnet_io_attr *retp;
1066 int ret_num;
1067 int err;
1069 retp = dnet_remove_range(m_session, &io, group_id, cflags, &ret_num, &err);
1071 if (!retp && err) {
1072 std::ostringstream str;
1073 str << "Failed to read range data object: group: " << group_id <<
1074 ", key: " << dnet_dump_id_str(io.id) <<
1075 ", size: " << io.size << ": err: " << strerror(-err) << ": " << err;
1076 throw std::runtime_error(str.str());
1079 std::vector<struct dnet_io_attr> ret;;
1081 if (retp) {
1082 for (int i = 0; i < ret_num; ++i) {
1083 ret.push_back(retp[i]);
1086 free(retp);
1089 return ret;
1092 std::string session::write_prepare(const struct dnet_id &id, const std::string &str, uint64_t remote_offset,
1093 uint64_t psize, uint64_t cflags, unsigned int ioflags)
1095 struct dnet_io_control ctl;
1097 memset(&ctl, 0, sizeof(ctl));
1099 ctl.cflags = cflags;
1100 ctl.data = str.data();
1102 ctl.io.flags = ioflags | DNET_IO_FLAGS_PREPARE | DNET_IO_FLAGS_PLAIN_WRITE;
1103 ctl.io.offset = remote_offset;
1104 ctl.io.size = str.size();
1105 ctl.io.type = id.type;
1106 ctl.io.num = psize;
1108 memcpy(&ctl.id, &id, sizeof(id));
1110 ctl.fd = -1;
1112 char *result = NULL;
1113 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
1114 if (err < 0) {
1115 std::ostringstream string;
1116 string << dnet_dump_id(&ctl.id) << ": write_prepare: size: " << str.size() << ", err: " << err;
1117 throw std::runtime_error(string.str());
1120 std::string ret(result, err);
1121 free(result);
1123 return ret;
1126 std::string session::write_prepare(const std::string &remote, const std::string &str, uint64_t remote_offset,
1127 uint64_t psize, uint64_t cflags, unsigned int ioflags, int type)
1129 struct dnet_id id;
1130 memset(&id, 0, sizeof(id));
1131 transform(remote, id);
1132 id.type = type;
1133 return write_prepare(id, str, remote_offset, psize, cflags, ioflags);
1136 std::string session::write_commit(const struct dnet_id &id, const std::string &str, uint64_t remote_offset, uint64_t csize,
1137 uint64_t cflags, unsigned int ioflags)
1139 struct dnet_io_control ctl;
1141 memset(&ctl, 0, sizeof(ctl));
1143 ctl.cflags = cflags;
1144 ctl.data = str.data();
1146 ctl.io.flags = ioflags | DNET_IO_FLAGS_COMMIT | DNET_IO_FLAGS_PLAIN_WRITE;
1147 ctl.io.offset = remote_offset;
1148 ctl.io.size = str.size();
1149 ctl.io.type = id.type;
1150 ctl.io.num = csize;
1152 memcpy(&ctl.id, &id, sizeof(id));
1154 ctl.fd = -1;
1156 char *result = NULL;
1157 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
1158 if (err < 0) {
1159 std::ostringstream string;
1160 string << dnet_dump_id(&ctl.id) << ": write_commit: size: " << str.size() << ", err: " << err;
1161 throw std::runtime_error(string.str());
1164 std::string ret(result, err);
1165 free(result);
1167 return ret;
1170 std::string session::write_commit(const std::string &remote, const std::string &str, uint64_t remote_offset, uint64_t csize,
1171 uint64_t cflags, unsigned int ioflags, int type)
1173 struct dnet_id id;
1174 memset(&id, 0, sizeof(id));
1175 transform(remote, id);
1176 id.type = type;
1177 return write_commit(id, str, remote_offset, csize, cflags, ioflags);
1180 std::string session::write_plain(const struct dnet_id &id, const std::string &str, uint64_t remote_offset,
1181 uint64_t cflags, unsigned int ioflags)
1183 struct dnet_io_control ctl;
1185 memset(&ctl, 0, sizeof(ctl));
1187 ctl.cflags = cflags;
1188 ctl.data = str.data();
1190 ctl.io.flags = ioflags | DNET_IO_FLAGS_PLAIN_WRITE;
1191 ctl.io.offset = remote_offset;
1192 ctl.io.size = str.size();
1193 ctl.io.type = id.type;
1195 memcpy(&ctl.id, &id, sizeof(id));
1197 ctl.fd = -1;
1199 char *result = NULL;
1200 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
1201 if (err < 0) {
1202 std::ostringstream string;
1203 string << dnet_dump_id(&ctl.id) << ": write_plain: size: " << str.size() << ", err: " << err;
1204 throw std::runtime_error(string.str());
1207 std::string ret(result, err);
1208 free(result);
1210 return ret;
1213 std::string session::write_plain(const std::string &remote, const std::string &str, uint64_t remote_offset,
1214 uint64_t cflags, unsigned int ioflags, int type)
1216 struct dnet_id id;
1217 memset(&id, 0, sizeof(id));
1218 transform(remote, id);
1219 id.type = type;
1220 return write_plain(id, str, remote_offset, cflags, ioflags);
1223 std::vector<std::pair<struct dnet_id, struct dnet_addr> > session::get_routes()
1225 std::vector<std::pair<struct dnet_id, struct dnet_addr> > res;
1226 struct dnet_id *ids = NULL;
1227 struct dnet_addr *addrs = NULL;
1229 int count = 0;
1231 count = dnet_get_routes(m_session, &ids, &addrs);
1233 if (count > 0) {
1234 for (int i = 0; i < count; ++i) {
1235 res.push_back(std::make_pair(ids[i], addrs[i]));
1239 if (ids)
1240 free(ids);
1242 if (addrs)
1243 free(addrs);
1245 return res;
1248 std::string session::request(struct dnet_id *id, struct sph *sph, bool lock)
1250 std::string ret_str;
1252 void *ret = NULL;
1253 int err;
1255 if (lock)
1256 err = dnet_send_cmd(m_session, id, sph, &ret);
1257 else
1258 err = dnet_send_cmd_nolock(m_session, id, sph, &ret);
1260 if (err < 0) {
1261 std::ostringstream str;
1263 str << dnet_dump_id(id) << ": failed to send request: " << strerror(-err) << ": " << err;
1264 throw std::runtime_error(str.str());
1267 if (ret && err) {
1268 try {
1269 ret_str.assign((char *)ret, err);
1270 } catch (...) {
1271 free(ret);
1272 throw;
1274 free(ret);
1277 return ret_str;
1280 std::string session::raw_exec(struct dnet_id *id, const struct sph *orig_sph,
1281 const std::string &event, const std::string &data, const std::string &binary, bool lock)
1283 std::vector<char> vec(event.size() + data.size() + binary.size() + sizeof(struct sph));
1284 std::string ret_str;
1286 struct sph *sph = (struct sph *)&vec[0];
1288 memset(sph, 0, sizeof(struct sph));
1289 if (orig_sph) {
1290 *sph = *orig_sph;
1291 sph->flags &= ~DNET_SPH_FLAGS_SRC_BLOCK;
1292 } else if (id) {
1293 sph->flags = DNET_SPH_FLAGS_SRC_BLOCK;
1294 memcpy(sph->src.id, id->id, sizeof(sph->src.id));
1297 sph->data_size = data.size();
1298 sph->binary_size = binary.size();
1299 sph->event_size = event.size();
1301 memcpy(sph->data, event.data(), event.size());
1302 memcpy(sph->data + event.size(), data.data(), data.size());
1303 memcpy(sph->data + event.size() + data.size(), binary.data(), binary.size());
1305 return request(id, sph, lock);
1308 std::string session::exec_locked(struct dnet_id *id, const std::string &event, const std::string &data, const std::string &binary)
1310 return raw_exec(id, NULL, event, data, binary, true);
1313 std::string session::exec_unlocked(struct dnet_id *id, const std::string &event, const std::string &data, const std::string &binary)
1315 return raw_exec(id, NULL, event, data, binary, false);
1318 std::string session::push_locked(struct dnet_id *id, const struct sph &sph, const std::string &event,
1319 const std::string &data, const std::string &binary)
1321 return raw_exec(id, &sph, event, data, binary, true);
1324 std::string session::push_unlocked(struct dnet_id *id, const struct sph &sph, const std::string &event,
1325 const std::string &data, const std::string &binary)
1327 return raw_exec(id, &sph, event, data, binary, false);
1330 void session::reply(const struct sph &orig_sph, const std::string &event, const std::string &data, const std::string &binary)
1332 std::vector<char> vec(event.size() + data.size() + binary.size() + sizeof(struct sph));
1333 std::string ret_str;
1335 struct sph *sph = (struct sph *)&vec[0];
1337 *sph = orig_sph;
1339 sph->data_size = data.size();
1340 sph->binary_size = binary.size();
1341 sph->event_size = event.size();
1343 memcpy(sph->data, event.data(), event.size());
1344 memcpy(sph->data + event.size(), data.data(), data.size());
1345 memcpy(sph->data + event.size() + data.size(), binary.data(), binary.size());
1347 struct dnet_id id;
1348 dnet_setup_id(&id, 0, sph->src.id);
1349 id.type = 0;
1351 request(&id, sph, false);
1354 namespace {
1355 bool dnet_io_attr_compare(const struct dnet_io_attr &io1, const struct dnet_io_attr &io2) {
1356 int cmp;
1358 cmp = dnet_id_cmp_str(io1.id, io2.id);
1359 return cmp < 0;
1363 std::vector<std::string> session::bulk_read(const std::vector<struct dnet_io_attr> &ios, uint64_t cflags)
1365 struct dnet_range_data *data;
1366 int num, *g, err;
1368 num = dnet_mix_states(m_session, NULL, &g);
1369 if (num < 0)
1370 throw std::runtime_error("could not fetch groups: " + std::string(strerror(num)));
1372 std::vector<int> groups;
1373 try {
1374 groups.assign(g, g + num);
1375 free(g);
1376 } catch (...) {
1377 free(g);
1378 throw;
1381 std::vector<struct dnet_io_attr> tmp_ios = ios;
1382 std::sort(tmp_ios.begin(), tmp_ios.end(), dnet_io_attr_compare);
1384 std::vector<std::string> ret;
1386 for (std::vector<int>::iterator group = groups.begin(); group != groups.end(); ++group) {
1387 if (!tmp_ios.size())
1388 break;
1390 data = dnet_bulk_read(m_session, (struct dnet_io_attr *)(&tmp_ios[0]), tmp_ios.size(), *group, cflags, &err);
1391 if (!data && err) {
1392 std::ostringstream str;
1393 str << "Failed to read bulk data: group: " << *group <<
1394 ": err: " << strerror(-err) << ": " << err;
1395 throw std::runtime_error(str.str());
1398 if (data) {
1399 for (int i = 0; i < err; ++i) {
1400 struct dnet_range_data *d = &data[i];
1401 char *data = (char *)d->data;
1403 while (d->size) {
1404 struct dnet_io_attr *io = (struct dnet_io_attr *)data;
1406 for (std::vector<struct dnet_io_attr>::iterator it = tmp_ios.begin(); it != tmp_ios.end(); ++it) {
1407 int cmp = dnet_id_cmp_str(it->id, io->id);
1409 if (cmp == 0) {
1410 tmp_ios.erase(it);
1411 break;
1415 dnet_convert_io_attr(io);
1417 uint64_t size = dnet_bswap64(io->size);
1419 std::string str;
1421 str.append((char *)io->id, DNET_ID_SIZE);
1422 str.append((char *)&size, 8);
1423 str.append((const char *)(io + 1), io->size);
1425 ret.push_back(str);
1427 data += sizeof(struct dnet_io_attr) + io->size;
1428 d->size -= sizeof(struct dnet_io_attr) + io->size;
1431 free(d->data);
1434 free(data);
1438 return ret;
1441 std::vector<std::string> session::bulk_read(const std::vector<std::string> &keys, uint64_t cflags)
1443 std::vector<struct dnet_io_attr> ios;
1444 struct dnet_io_attr io;
1445 memset(&io, 0, sizeof(io));
1447 ios.reserve(keys.size());
1449 for (size_t i = 0; i < keys.size(); ++i) {
1450 struct dnet_id id;
1452 transform(keys[i], id);
1453 memcpy(io.id, id.id, sizeof(io.id));
1454 ios.push_back(io);
1457 return bulk_read(ios, cflags);
1460 std::string session::bulk_write(const std::vector<struct dnet_io_attr> &ios, const std::vector<std::string> &data, uint64_t cflags)
1462 std::vector<struct dnet_io_control> ctls;
1463 unsigned int i;
1464 int err;
1466 if (ios.size() != data.size()) {
1467 std::ostringstream string;
1468 string << "BULK_WRITE: ios doesn't meet data: io.size: " << ios.size() << ", data.size: " << data.size();
1469 throw std::runtime_error(string.str());
1472 ctls.reserve(ios.size());
1474 for(i = 0; i < ios.size(); ++i) {
1475 struct dnet_io_control ctl;
1476 memset(&ctl, 0, sizeof(ctl));
1478 ctl.cflags = cflags;
1479 ctl.data = data[i].data();
1481 ctl.io = ios[i];
1483 dnet_setup_id(&ctl.id, 0, (unsigned char *)ios[i].id);
1484 ctl.id.type = ios[i].type;
1486 ctl.fd = -1;
1488 ctls.push_back(ctl);
1491 struct dnet_range_data ret = dnet_bulk_write(m_session, &ctls[0], ctls.size(), &err);
1492 if (err < 0) {
1493 std::ostringstream string;
1494 string << "BULK_WRITE: size: " << ret.size << ", err: " << err;
1495 throw std::runtime_error(string.str());
1498 std::string ret_str((const char *)ret.data, ret.size);
1499 free(ret.data);
1501 return ret_str;
1504 struct dnet_node * session::get_node()
1506 return m_node->m_node;