Merge branch 'master' of https://github.com/resetius/elliptics
[elliptics.git] / bindings / cpp / node.cpp
blobf6b141546e634101eada45d53b84e24e08c09fcc
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #define _XOPEN_SOURCE 600
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/mman.h>
22 #include <sys/wait.h>
24 #include <errno.h>
25 #include <ctype.h>
26 #include <dirent.h>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
34 #include <algorithm>
35 #include <iostream>
36 #include <stdexcept>
37 #include <string>
38 #include <sstream>
39 #include <vector>
41 #include <boost/algorithm/string.hpp>
42 #include <elliptics/cppdef.h>
44 using namespace ioremap::elliptics;
46 node::node(logger &l) : m_node(NULL), m_log(NULL)
48 struct dnet_config cfg;
50 memset(&cfg, 0, sizeof(cfg));
52 cfg.sock_type = SOCK_STREAM;
53 cfg.proto = IPPROTO_TCP;
54 cfg.wait_timeout = 5;
55 cfg.check_timeout = 20;
57 m_log = reinterpret_cast<logger *>(l.clone());
58 cfg.log = m_log->get_dnet_log();
60 snprintf(cfg.addr, sizeof(cfg.addr), "0.0.0.0");
61 snprintf(cfg.port, sizeof(cfg.port), "0");
63 m_node = dnet_node_create(&cfg);
64 if (!m_node) {
65 delete m_log;
66 throw std::bad_alloc();
70 node::node(logger &l, struct dnet_config &cfg) : m_node(NULL), m_log(NULL)
72 cfg.sock_type = SOCK_STREAM;
73 cfg.proto = IPPROTO_TCP;
75 m_log = reinterpret_cast<logger *>(l.clone());
76 cfg.log = m_log->get_dnet_log();
78 snprintf(cfg.addr, sizeof(cfg.addr), "0.0.0.0");
79 snprintf(cfg.port, sizeof(cfg.port), "0");
81 m_node = dnet_node_create(&cfg);
82 if (!m_node) {
83 delete m_log;
84 throw std::bad_alloc();
88 node::node(logger &l, const std::string &config_path) : m_node(NULL), m_log(NULL)
90 struct dnet_config cfg;
91 memset(&cfg, 0, sizeof(struct dnet_config));
93 cfg.sock_type = SOCK_STREAM;
94 cfg.proto = IPPROTO_TCP;
96 m_log = reinterpret_cast<logger *>(l.clone());
97 cfg.log = m_log->get_dnet_log();
99 std::list<addr_tuple> remotes;
100 std::vector<int> groups;
102 parse_config(config_path, cfg, remotes, groups, cfg.log->log_level);
104 m_node = dnet_node_create(&cfg);
105 if (!m_node) {
106 delete m_log;
107 throw std::bad_alloc();
110 for (std::list<addr_tuple>::iterator it = remotes.begin(); it != remotes.end(); ++it) {
111 try {
112 add_remote(it->host.c_str(), it->port, it->family);
113 } catch (...) {
114 continue;
119 node::~node()
121 dnet_node_destroy(m_node);
122 delete m_log;
125 session::session(node &n) : m_node(&n)
127 m_session = dnet_session_create(m_node->m_node);
129 if (!m_session)
130 throw std::bad_alloc();
133 session::~session()
137 void node::parse_config(const std::string &path, struct dnet_config &cfg,
138 std::list<addr_tuple> &remotes,
139 std::vector<int> &groups,
140 int &log_level)
142 std::ifstream in(path.c_str());
143 std::string line;
144 int line_num = 0;
146 while (!in.eof() && in.good()) {
147 line.resize(1024);
149 in.getline((char *)line.data(), line.size());
150 size_t len = in.gcount();
152 line.resize(len);
154 if (in.eof() || !in.good())
155 break;
157 boost::trim(line);
158 line_num++;
160 if (line.size() < 3 || line.data()[0] == '#')
161 continue;
163 std::vector<std::string> strs;
164 boost::split(strs, line, boost::is_any_of("="));
166 std::string key = strs[0];
167 boost::trim(key);
169 if (strs.size() != 2) {
170 std::ostringstream str;
171 str << path << ": invalid elliptics config: line: " << line_num <<
172 ", key: " << key << "': string is broken: size: " << strs.size();
173 throw std::runtime_error(str.str());
175 std::string value = strs[1];
176 boost::trim(value);
178 if (key == "remote") {
179 std::vector<std::string> rem;
180 boost::split(rem, value, boost::is_any_of(" "));
182 for (std::vector<std::string>::iterator it = rem.begin(); it != rem.end(); ++it) {
183 std::string addr_str = *it;
184 if (dnet_parse_addr((char *)addr_str.c_str(), &cfg)) {
185 std::ostringstream str;
186 str << path << ": invalid elliptics config: '" << key << "' ";
187 str << path << ": invalid elliptics config: line: " << line_num <<
188 ", key: '" << key << "': remote addr is invalid";
189 throw std::runtime_error(str.str());
192 addr_tuple addr(cfg.addr, atoi(cfg.port), cfg.family);
193 remotes.push_back(addr);
197 if (key == "groups") {
198 std::vector<std::string> gr;
199 boost::split(gr, value, boost::is_any_of(":"));
201 for (std::vector<std::string>::iterator it = gr.begin(); it != gr.end(); ++it) {
202 int group = atoi(it->c_str());
204 if (group != 0)
205 groups.push_back(group);
209 if (key == "check_timeout")
210 cfg.check_timeout = strtoul(value.c_str(), NULL, 0);
211 if (key == "wait_timeout")
212 cfg.wait_timeout = strtoul(value.c_str(), NULL, 0);
213 if (key == "log_level")
214 log_level = strtoul(value.c_str(), NULL, 0);
218 void node::add_remote(const char *addr, const int port, const int family)
220 struct dnet_config cfg;
221 int err;
223 memset(&cfg, 0, sizeof(cfg));
225 cfg.family = family;
226 snprintf(cfg.addr, sizeof(cfg.addr), "%s", addr);
227 snprintf(cfg.port, sizeof(cfg.port), "%d", port);
229 err = dnet_add_state(m_node, &cfg);
230 if (err) {
231 std::ostringstream str;
232 str << "Failed to add remote addr " << addr << ":" << port << ": " << err;
233 throw std::runtime_error(str.str());
237 void node::set_timeouts(const int wait_timeout, const int check_timeout)
239 dnet_set_timeouts(m_node, wait_timeout, check_timeout);
242 void session::add_groups(std::vector<int> &groups)
244 if (dnet_session_set_groups(m_session, (int *)&groups[0], groups.size()))
245 throw std::bad_alloc();
246 this->groups = groups;
249 void session::read_file(struct dnet_id &id, const std::string &file, uint64_t offset, uint64_t size)
251 int err;
253 err = dnet_read_file_id(m_session, file.c_str(), &id, offset, size);
254 if (err) {
255 std::ostringstream str;
256 str << dnet_dump_id(&id) << ": READ: " << file << ": offset: " << offset << ", size: " << size << ": " << err;
257 throw std::runtime_error(str.str());
261 void session::read_file(const std::string &remote, const std::string &file, uint64_t offset, uint64_t size, int type)
263 int err;
265 err = dnet_read_file(m_session, file.c_str(), remote.data(), remote.size(), offset, size, type);
266 if (err) {
267 struct dnet_id id;
268 transform(remote, id);
269 id.type = 0;
271 std::ostringstream str;
272 str << dnet_dump_id(&id) << ": READ: " << file << ": offset: " << offset << ", size: " << size << ": " << err;
273 throw std::runtime_error(str.str());
277 void session::write_file(struct dnet_id &id, const std::string &file, uint64_t local_offset,
278 uint64_t offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
280 int err = dnet_write_file_id(m_session, file.c_str(), &id, local_offset, offset, size, cflags, ioflags);
281 if (err) {
282 std::ostringstream str;
283 str << dnet_dump_id(&id) << ": WRITE: " << file << ", local_offset: " << local_offset <<
284 ", offset: " << offset << ", size: " << size << ": " << err;
285 throw std::runtime_error(str.str());
288 void session::write_file(const std::string &remote, const std::string &file, uint64_t local_offset, uint64_t offset, uint64_t size,
289 uint64_t cflags, unsigned int ioflags, int type)
291 int err = dnet_write_file(m_session, file.c_str(), remote.data(), remote.size(),
292 local_offset, offset, size, cflags, ioflags, type);
293 if (err) {
294 struct dnet_id id;
295 transform(remote, id);
296 id.type = 0;
298 std::ostringstream str;
299 str << dnet_dump_id(&id) << ": WRITE: " << file << ", local_offset: " << local_offset <<
300 ", offset: " << offset << ", size: " << size << ": " << err;
301 throw std::runtime_error(str.str());
305 std::string session::read_data_wait(struct dnet_id &id, uint64_t offset, uint64_t size,
306 uint64_t cflags, uint32_t ioflags)
308 struct dnet_io_attr io;
309 int err;
311 memset(&io, 0, sizeof(io));
312 io.size = size;
313 io.offset = offset;
314 io.flags = ioflags;
315 io.type = id.type;
317 memcpy(io.id, id.id, DNET_ID_SIZE);
318 memcpy(io.parent, id.id, DNET_ID_SIZE);
320 void *data = dnet_read_data_wait(m_session, &id, &io, cflags, &err);
321 if (!data) {
322 std::ostringstream str;
323 str << dnet_dump_id(&id) << ": READ: size: " << size << ": err: " << strerror(-err) << ": " << err;
324 throw std::runtime_error(str.str());
327 std::string ret = std::string((const char *)data + sizeof(struct dnet_io_attr), io.size - sizeof(struct dnet_io_attr));
328 free(data);
330 return ret;
333 std::string session::read_data_wait(const std::string &remote, uint64_t offset, uint64_t size,
334 uint64_t cflags, uint32_t ioflags, int type)
336 struct dnet_id id;
338 transform(remote, id);
339 id.type = type;
341 return read_data_wait(id, offset, size, cflags, ioflags);
344 void session::prepare_latest(struct dnet_id &id, uint64_t cflags, std::vector<int> &groups)
346 struct dnet_read_latest_prepare pr;
347 int err;
349 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
351 pr.s = m_session;
352 pr.id = id;
353 pr.cflags = cflags;
355 pr.group = (int *)malloc(groups.size() * sizeof(int));
356 if (!pr.group) {
357 std::ostringstream str;
359 str << dnet_dump_id(&id) << ": prepare_latest: allocation failure: group num: " << groups.size();
360 throw std::runtime_error(str.str());
362 pr.group_num = groups.size();
364 for (unsigned i = 0; i < groups.size(); ++i)
365 pr.group[i] = groups[i];
367 err = dnet_read_latest_prepare(&pr);
368 if (!err) {
369 try {
370 groups.clear();
372 for (int i = 0; i < pr.group_num; ++i)
373 groups.push_back(pr.group[i]);
374 } catch (...) {
375 free(pr.group);
376 throw;
380 free(pr.group);
382 if (!groups.size())
383 err = -ENOENT;
385 if (err) {
386 std::ostringstream str;
388 str << dnet_dump_id(&id) << ": prepare_latest: groups: " << groups.size() << ": err: " << strerror(-err) << ": " << err;
389 throw std::runtime_error(str.str());
393 std::string session::read_latest(struct dnet_id &id, uint64_t offset, uint64_t size,
394 uint64_t cflags, uint32_t ioflags)
396 struct dnet_io_attr io;
397 void *data;
398 int err;
400 memset(&io, 0, sizeof(io));
401 io.size = size;
402 io.offset = offset;
403 io.flags = ioflags;
404 io.type = id.type;
405 io.num = groups.size();
407 memcpy(io.id, id.id, DNET_ID_SIZE);
408 memcpy(io.parent, id.id, DNET_ID_SIZE);
410 err = dnet_read_latest(m_session, &id, &io, cflags, &data);
411 if (err < 0) {
412 std::ostringstream str;
413 str << dnet_dump_id(&id) << ": READ: size: " << size << ": err: " << strerror(-err) << ": " << err;
414 throw std::runtime_error(str.str());
417 std::string ret = std::string((const char *)data + sizeof(struct dnet_io_attr), io.size - sizeof(struct dnet_io_attr));
418 free(data);
420 return ret;
423 std::string session::read_latest(const std::string &remote, uint64_t offset, uint64_t size,
424 uint64_t cflags, uint32_t ioflags, int type)
426 struct dnet_id id;
428 transform(remote, id);
429 id.type = type;
431 return read_latest(id, offset, size, cflags, ioflags);
434 std::string session::write_cache(struct dnet_id &id, const std::string &str,
435 uint64_t cflags, unsigned int ioflags, long timeout)
437 struct dnet_io_control ctl;
439 memset(&ctl, 0, sizeof(ctl));
441 ctl.cflags = cflags;
442 ctl.data = str.data();
444 ctl.io.flags = ioflags | DNET_IO_FLAGS_CACHE;
445 ctl.io.start = timeout;
446 ctl.io.size = str.size();
447 ctl.io.type = id.type;
448 ctl.io.num = str.size();
450 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
452 ctl.fd = -1;
454 char *result = NULL;
455 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
456 if (err < 0) {
457 std::ostringstream string;
458 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
459 throw std::runtime_error(string.str());
462 std::string ret((const char *)result, err);
463 free(result);
465 return ret;
468 std::string session::write_cache(const std::string &key, const std::string &str,
469 uint64_t cflags, unsigned int ioflags, long timeout)
471 struct dnet_id id;
473 transform(key, id);
474 id.type = 0;
475 id.group_id = 0;
477 return write_cache(id, str, cflags, ioflags, timeout);
480 std::string session::write_compare_and_swap(const struct dnet_id &id, const std::string &str,
481 const dnet_id &old_csum, uint64_t remote_offset, uint64_t cflags, unsigned int ioflags) {
482 struct dnet_io_control ctl;
484 memset(&ctl, 0, sizeof(ctl));
486 ctl.cflags = cflags;
487 ctl.data = str.data();
489 ctl.io.flags = ioflags | DNET_IO_FLAGS_COMPARE_AND_SWAP;
490 ctl.io.offset = remote_offset;
491 ctl.io.size = str.size();
492 ctl.io.type = id.type;
493 ctl.io.num = str.size() + remote_offset;
495 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
496 memcpy(&ctl.io.parent, &old_csum.id, DNET_ID_SIZE);
498 ctl.fd = -1;
500 char *result = NULL;
501 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
502 if (err < 0) {
503 std::ostringstream string;
504 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
505 throw std::runtime_error(string.str());
508 std::string ret((const char *)result, err);
509 free(result);
511 return ret;
514 std::string session::write_compare_and_swap(const std::string &remote, const std::string &str, const struct dnet_id &old_csum,
515 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags, int type)
517 struct dnet_id id;
519 transform(remote, id);
520 id.type = type;
521 id.group_id = 0;
523 return write_compare_and_swap(id, str, old_csum, remote_offset, cflags, ioflags);
526 std::string session::write_data_wait(struct dnet_id &id, const std::string &str,
527 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags)
529 struct dnet_io_control ctl;
531 memset(&ctl, 0, sizeof(ctl));
533 ctl.cflags = cflags;
534 ctl.data = str.data();
536 ctl.io.flags = ioflags;
537 ctl.io.offset = remote_offset;
538 ctl.io.size = str.size();
539 ctl.io.type = id.type;
540 ctl.io.num = str.size() + remote_offset;
542 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
544 ctl.fd = -1;
546 char *result = NULL;
547 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
548 if (err < 0) {
549 std::ostringstream string;
550 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
551 throw std::runtime_error(string.str());
554 std::string ret((const char *)result, err);
555 free(result);
557 return ret;
560 std::string session::write_data_wait(const std::string &remote, const std::string &str,
561 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags, int type)
563 struct dnet_id id;
565 transform(remote, id);
566 id.type = type;
567 id.group_id = 0;
569 return write_data_wait(id, str, remote_offset, cflags, ioflags);
572 std::string session::lookup_addr(const std::string &remote, const int group_id)
574 char buf[128];
576 int err = dnet_lookup_addr(m_session, remote.data(), remote.size(), NULL, group_id, buf, sizeof(buf));
577 if (err < 0) {
578 std::ostringstream str;
579 str << "Failed to lookup in group " << group_id << ": key size: " << remote.size() << ", err: " << err;
580 throw std::runtime_error(str.str());
583 return std::string((const char *)buf, strlen(buf));
586 std::string session::lookup_addr(const struct dnet_id &id)
588 char buf[128];
590 int err = dnet_lookup_addr(m_session, NULL, 0, (struct dnet_id *)&id, id.group_id, buf, sizeof(buf));
591 if (err < 0) {
592 std::ostringstream str;
593 str << "Failed to lookup " << dnet_dump_id(&id) << ": err: " << err;
594 throw std::runtime_error(str.str());
597 return std::string((const char *)buf, strlen(buf));
600 std::string session::create_metadata(const struct dnet_id &id, const std::string &obj,
601 const std::vector<int> &groups, const struct timespec &ts)
603 struct dnet_metadata_control ctl;
604 struct dnet_meta_container mc;
605 int err;
607 memset(&mc, 0, sizeof(struct dnet_meta_container));
608 memset(&ctl, 0, sizeof(struct dnet_metadata_control));
610 ctl.obj = (char *)obj.data();
611 ctl.len = obj.size();
613 ctl.groups = (int *)&groups[0];
614 ctl.group_num = groups.size();
616 ctl.ts = ts;
617 ctl.id = id;
619 err = dnet_create_metadata(m_session, &ctl, &mc);
620 if (err) {
621 std::ostringstream str;
622 str << "Failed to create metadata: key: " << dnet_dump_id(&id) << ", err: " << err;
623 throw std::runtime_error(str.str());
626 std::string ret;
628 try {
629 ret.assign((char *)mc.data, mc.size);
630 } catch (...) {
631 free(mc.data);
632 throw;
635 free(mc.data);
636 return ret;
639 int session::write_metadata(const struct dnet_id &id, const std::string &obj,
640 const std::vector<int> &groups, const struct timespec &ts, uint64_t cflags)
642 int err;
643 std::string meta;
644 struct dnet_meta_container mc;
646 if (dnet_flags(m_node->m_node) & DNET_CFG_NO_META)
647 return 0;
649 meta = create_metadata(id, obj, groups, ts);
651 mc.data = (void *)meta.data();
652 mc.size = meta.size();
654 mc.id = id;
656 err = dnet_write_metadata(m_session, &mc, 1, cflags);
657 if (err) {
658 std::ostringstream str;
659 str << "Failed to write metadata: key: " << dnet_dump_id(&id) << ", err: " << err;
660 throw std::runtime_error(str.str());
663 return 0;
666 void session::transform(const std::string &data, struct dnet_id &id)
668 dnet_transform(m_node->m_node, (void *)data.data(), data.size(), &id);
671 void session::lookup(const struct dnet_id &id, const callback &c)
673 int err = dnet_lookup_object(m_session, (struct dnet_id *)&id, 0,
674 callback::complete_callback,
675 (void *)&c);
677 if (err) {
678 std::ostringstream str;
679 str << "Failed to lookup ID " << dnet_dump_id(&id) << ": " << err;
680 throw std::runtime_error(str.str());
684 void session::lookup(const std::string &data, const callback &c)
686 struct dnet_id id;
687 int error = -ENOENT, i, num, *g;
689 transform(data, id);
690 id.type = 0;
692 num = dnet_mix_states(m_session, &id, &g);
693 if (num < 0)
694 throw std::bad_alloc();
696 for (i=0; i<num; ++i) {
697 id.group_id = g[i];
699 try {
700 lookup(id, c);
701 } catch (...) {
702 continue;
705 error = 0;
706 break;
709 free(g);
711 if (error) {
712 std::ostringstream str;
713 str << "Failed to lookup data object: key: " << dnet_dump_id(&id);
714 throw std::runtime_error(str.str());
718 std::string session::lookup(const std::string &data)
720 struct dnet_id id;
721 int error = -ENOENT, i, num, *g;
722 std::string ret;
724 transform(data, id);
725 id.type = 0;
727 num = dnet_mix_states(m_session, &id, &g);
728 if (num < 0)
729 throw std::bad_alloc();
731 for (i=0; i<num; ++i) {
732 try {
733 callback l;
734 id.group_id = g[i];
736 lookup(id, l);
737 ret = l.wait();
739 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd)) {
740 std::stringstream str;
742 str << dnet_dump_id(&id) << ": failed to receive lookup request";
743 throw std::runtime_error(str.str());
745 /* reply parsing examaple */
746 #if 0
747 struct dnet_addr *addr = (struct dnet_addr *)ret.data();
748 struct dnet_cmd *cmd = (struct dnet_cmd *)(addr + 1);
750 if (cmd->size > sizeof(struct dnet_addr_attr)) {
751 struct dnet_addr_attr *a = (struct dnet_addr_attr *)(cmd + 1);
752 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
754 dnet_convert_addr_attr(a);
755 dnet_convert_file_info(info);
757 #endif
758 dnet_log_raw(m_node->m_node, DNET_LOG_DEBUG, "%s: %s: %zu bytes\n", dnet_dump_id(&id), data.c_str(), ret.size());
759 error = 0;
760 break;
761 } catch (const std::exception &e) {
762 dnet_log_raw(m_node->m_node, DNET_LOG_ERROR, "%s: %s : %s\n", dnet_dump_id(&id), e.what(), data.c_str());
763 continue;
767 free(g);
769 if (error) {
770 std::ostringstream str;
771 str << data << ": could not find object";
773 throw std::runtime_error(str.str());
776 return ret;
779 std::string session::lookup(const struct dnet_id &id)
781 int error = -ENOENT;
782 std::string ret;
784 try {
785 callback l;
787 lookup(id, l);
788 ret = l.wait();
790 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd)) {
791 std::stringstream str;
793 str << dnet_dump_id(&id) << ": failed to receive lookup request";
794 throw std::runtime_error(str.str());
797 dnet_log_raw(m_node->m_node, DNET_LOG_DEBUG, "%s: %zu bytes\n", dnet_dump_id(&id), ret.size());
798 error = 0;
799 } catch (const std::exception &e) {
800 dnet_log_raw(m_node->m_node, DNET_LOG_ERROR, "%s: %s\n", dnet_dump_id(&id), e.what());
803 if (error) {
804 std::ostringstream str;
805 str << dnet_dump_id(&id) << ": could not find object";
807 throw std::runtime_error(str.str());
810 return ret;
813 void session::remove_raw(struct dnet_id &id, uint64_t cflags, uint64_t ioflags)
815 int err = -ENOENT;
816 std::vector<int> g = groups;
818 for (int i=0; i<(int)g.size(); ++i) {
819 id.group_id = g[i];
821 if (!dnet_remove_object_now(m_session, &id, cflags, ioflags))
822 err = 0;
825 if (err) {
826 std::ostringstream str;
827 str << dnet_dump_id(&id) << ": REMOVE: " << err;
828 throw std::runtime_error(str.str());
832 void session::remove(struct dnet_id &id)
834 remove_raw(id, 0, 0);
837 void session::remove_raw(const std::string &data, int type, uint64_t cflags, uint64_t ioflags)
839 struct dnet_id id;
841 transform(data, id);
842 id.type = type;
844 remove_raw(id, cflags, ioflags);
847 void session::remove(const std::string &data, int type)
849 remove_raw(data, type, 0, 0);
852 std::string session::stat_log()
854 callback c;
855 std::string ret;
856 int err;
858 err = dnet_request_stat(m_session, NULL, DNET_CMD_STAT, 0,
859 callback::complete_callback, (void *)&c);
860 if (err < 0) {
861 std::ostringstream str;
862 str << "Failed to request statistics: " << err;
863 throw std::runtime_error(str.str());
866 ret = c.wait(err);
868 /* example reply parsing */
869 #if 0
870 float la[3];
871 const void *data = ret.data();
872 int size = ret.size();
873 char id_str[DNET_ID_SIZE*2 + 1];
874 char addr_str[128];
876 while (size) {
877 struct dnet_addr *addr = (struct dnet_addr *)data;
878 struct dnet_cmd *cmd = (struct dnet_cmd *)(addr + 1);
879 struct dnet_stat *st = (struct dnet_stat *)(cmd + 1);
881 dnet_convert_stat(st);
883 la[0] = (float)st->la[0] / 100.0;
884 la[1] = (float)st->la[1] / 100.0;
885 la[2] = (float)st->la[2] / 100.0;
887 printf("<stat addr=\"%s\" id=\"%s\"><la>%.2f %.2f %.2f</la>"
888 "<memtotal>%llu KB</memtotal><memfree>%llu KB</memfree><memcached>%llu KB</memcached>"
889 "<storage_size>%llu MB</storage_size><available_size>%llu MB</available_size>"
890 "<files>%llu</files><fsid>0x%llx</fsid></stat>",
891 dnet_server_convert_dnet_addr_raw(addr, addr_str, sizeof(addr_str)),
892 dnet_dump_id_len_raw(cmd->id.id, DNET_ID_SIZE, id_str),
893 la[0], la[1], la[2],
894 (unsigned long long)st->vm_total,
895 (unsigned long long)st->vm_free,
896 (unsigned long long)st->vm_cached,
897 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
898 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
899 (unsigned long long)st->files, (unsigned long long)st->fsid);
900 printf("\n");
902 int sz = sizeof(*addr) + sizeof(*cmd) + cmd->size;
904 size -= sz;
905 data += sz;
907 #endif
909 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd) + sizeof(struct dnet_stat))
910 throw std::runtime_error("Failed to request statistics: not enough data returned");
911 return ret;
914 int session::state_num(void)
916 return dnet_state_num(m_session);
919 int session::request_cmd(struct dnet_trans_control &ctl)
921 int err;
923 err = dnet_request_cmd(m_session, &ctl);
924 if (err < 0) {
925 std::ostringstream str;
926 str << dnet_dump_id(&ctl.id) << ": failed to request cmd: " << dnet_cmd_string(ctl.cmd) << ": " << err;
927 throw std::runtime_error(str.str());
930 return err;
933 void session::update_status(const char *saddr, const int port, const int family, struct dnet_node_status *status)
935 int err;
936 struct dnet_addr addr;
937 char sport[16];
939 memset(&addr, 0, sizeof(addr));
940 addr.addr_len = sizeof(addr.addr);
942 snprintf(sport, sizeof(sport), "%d", port);
944 err = dnet_fill_addr(&addr, saddr, sport, family, SOCK_STREAM, IPPROTO_TCP);
945 if (!err)
946 err = dnet_update_status(m_session, &addr, NULL, status);
948 if (err < 0) {
949 std::ostringstream str;
950 str << saddr << ":" << port << ": failed to request set status " << std::hex << status << ": " << err;
951 throw std::runtime_error(str.str());
955 void session::update_status(struct dnet_id &id, struct dnet_node_status *status)
957 int err;
959 err = dnet_update_status(m_session, NULL, &id, status);
960 if (err < 0) {
961 std::ostringstream str;
963 str << dnet_dump_id(&id) << ": failed to request set status " << std::hex << status << ": " << err;
964 throw std::runtime_error(str.str());
968 struct range_sort_compare {
969 bool operator () (const std::string &s1, const std::string &s2) {
970 unsigned char *id1 = (unsigned char *)s1.data();
971 unsigned char *id2 = (unsigned char *)s2.data();
973 int cmp = dnet_id_cmp_str(id1, id2);
975 return cmp < 0;
979 std::vector<std::string> session::read_data_range(struct dnet_io_attr &io, int group_id, uint64_t cflags)
981 struct dnet_range_data *data;
982 uint64_t num = 0;
983 uint32_t ioflags = io.flags;
984 int err;
986 data = dnet_read_range(m_session, &io, group_id, cflags, &err);
987 if (!data && err) {
988 std::ostringstream str;
989 str << "Failed to read range data object: group: " << group_id <<
990 ", key: " << dnet_dump_id_str(io.id) <<
991 ", size: " << io.size << ": err: " << strerror(-err) << ": " << err;
992 throw std::runtime_error(str.str());
995 std::vector<std::string> ret;
997 if (data) {
998 for (int i = 0; i < err; ++i) {
999 struct dnet_range_data *d = &data[i];
1000 char *data = (char *)d->data;
1002 if (!(ioflags & DNET_IO_FLAGS_NODATA)) {
1003 while (d->size) {
1004 struct dnet_io_attr *io = (struct dnet_io_attr *)data;
1006 dnet_convert_io_attr(io);
1008 std::string str;
1010 str.append((char *)io->id, DNET_ID_SIZE);
1011 str.append((char *)&io->size, sizeof(io->size));
1012 str.append((const char *)(io + 1), io->size);
1014 ret.push_back(str);
1016 data += sizeof(struct dnet_io_attr) + io->size;
1017 d->size -= sizeof(struct dnet_io_attr) + io->size;
1019 } else {
1020 if (d->size != sizeof(struct dnet_io_attr)) {
1021 std::ostringstream str;
1022 str << "Incorrect data size: d->size = " << d->size <<
1023 "sizeof = " << sizeof(struct dnet_io_attr);
1024 throw std::runtime_error(str.str());
1026 struct dnet_io_attr *rep = (struct dnet_io_attr *)data;
1027 num += rep->num;
1030 free(d->data);
1033 free(data);
1035 if (ioflags & DNET_IO_FLAGS_NODATA) {
1036 std::ostringstream str;
1037 str << num;
1038 ret.push_back(str.str());
1042 return ret;
1045 std::vector<struct dnet_io_attr> session::remove_data_range(struct dnet_io_attr &io, int group_id, uint64_t cflags)
1047 struct dnet_io_attr *retp;
1048 int ret_num;
1049 int err;
1051 retp = dnet_remove_range(m_session, &io, group_id, cflags, &ret_num, &err);
1053 if (!retp && err) {
1054 std::ostringstream str;
1055 str << "Failed to read range data object: group: " << group_id <<
1056 ", key: " << dnet_dump_id_str(io.id) <<
1057 ", size: " << io.size << ": err: " << strerror(-err) << ": " << err;
1058 throw std::runtime_error(str.str());
1061 std::vector<struct dnet_io_attr> ret;;
1063 if (retp) {
1064 for (int i = 0; i < ret_num; ++i) {
1065 ret.push_back(retp[i]);
1068 free(retp);
1071 return ret;
1074 std::string session::write_prepare(const struct dnet_id &id, const std::string &str, uint64_t remote_offset,
1075 uint64_t psize, uint64_t cflags, unsigned int ioflags)
1077 struct dnet_io_control ctl;
1079 memset(&ctl, 0, sizeof(ctl));
1081 ctl.cflags = cflags;
1082 ctl.data = str.data();
1084 ctl.io.flags = ioflags | DNET_IO_FLAGS_PREPARE | DNET_IO_FLAGS_PLAIN_WRITE;
1085 ctl.io.offset = remote_offset;
1086 ctl.io.size = str.size();
1087 ctl.io.type = id.type;
1088 ctl.io.num = psize;
1090 memcpy(&ctl.id, &id, sizeof(id));
1092 ctl.fd = -1;
1094 char *result = NULL;
1095 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
1096 if (err < 0) {
1097 std::ostringstream string;
1098 string << dnet_dump_id(&ctl.id) << ": write_prepare: size: " << str.size() << ", err: " << err;
1099 throw std::runtime_error(string.str());
1102 std::string ret(result, err);
1103 free(result);
1105 return ret;
1108 std::string session::write_prepare(const std::string &remote, const std::string &str, uint64_t remote_offset,
1109 uint64_t psize, uint64_t cflags, unsigned int ioflags, int type)
1111 struct dnet_id id;
1112 memset(&id, 0, sizeof(id));
1113 transform(remote, id);
1114 id.type = type;
1115 return write_prepare(id, str, remote_offset, psize, cflags, ioflags);
1118 std::string session::write_commit(const struct dnet_id &id, const std::string &str, uint64_t remote_offset, uint64_t csize,
1119 uint64_t cflags, unsigned int ioflags)
1121 struct dnet_io_control ctl;
1123 memset(&ctl, 0, sizeof(ctl));
1125 ctl.cflags = cflags;
1126 ctl.data = str.data();
1128 ctl.io.flags = ioflags | DNET_IO_FLAGS_COMMIT | DNET_IO_FLAGS_PLAIN_WRITE;
1129 ctl.io.offset = remote_offset;
1130 ctl.io.size = str.size();
1131 ctl.io.type = id.type;
1132 ctl.io.num = csize;
1134 memcpy(&ctl.id, &id, sizeof(id));
1136 ctl.fd = -1;
1138 char *result = NULL;
1139 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
1140 if (err < 0) {
1141 std::ostringstream string;
1142 string << dnet_dump_id(&ctl.id) << ": write_commit: size: " << str.size() << ", err: " << err;
1143 throw std::runtime_error(string.str());
1146 std::string ret(result, err);
1147 free(result);
1149 return ret;
1152 std::string session::write_commit(const std::string &remote, const std::string &str, uint64_t remote_offset, uint64_t csize,
1153 uint64_t cflags, unsigned int ioflags, int type)
1155 struct dnet_id id;
1156 memset(&id, 0, sizeof(id));
1157 transform(remote, id);
1158 id.type = type;
1159 return write_commit(id, str, remote_offset, csize, cflags, ioflags);
1162 std::string session::write_plain(const struct dnet_id &id, const std::string &str, uint64_t remote_offset,
1163 uint64_t cflags, unsigned int ioflags)
1165 struct dnet_io_control ctl;
1167 memset(&ctl, 0, sizeof(ctl));
1169 ctl.cflags = cflags;
1170 ctl.data = str.data();
1172 ctl.io.flags = ioflags | DNET_IO_FLAGS_PLAIN_WRITE;
1173 ctl.io.offset = remote_offset;
1174 ctl.io.size = str.size();
1175 ctl.io.type = id.type;
1177 memcpy(&ctl.id, &id, sizeof(id));
1179 ctl.fd = -1;
1181 char *result = NULL;
1182 int err = dnet_write_data_wait(m_session, &ctl, (void **)&result);
1183 if (err < 0) {
1184 std::ostringstream string;
1185 string << dnet_dump_id(&ctl.id) << ": write_plain: size: " << str.size() << ", err: " << err;
1186 throw std::runtime_error(string.str());
1189 std::string ret(result, err);
1190 free(result);
1192 return ret;
1195 std::string session::write_plain(const std::string &remote, const std::string &str, uint64_t remote_offset,
1196 uint64_t cflags, unsigned int ioflags, int type)
1198 struct dnet_id id;
1199 memset(&id, 0, sizeof(id));
1200 transform(remote, id);
1201 id.type = type;
1202 return write_plain(id, str, remote_offset, cflags, ioflags);
1205 std::vector<std::pair<struct dnet_id, struct dnet_addr> > session::get_routes()
1207 std::vector<std::pair<struct dnet_id, struct dnet_addr> > res;
1208 struct dnet_id *ids = NULL;
1209 struct dnet_addr *addrs = NULL;
1211 int count = 0;
1213 count = dnet_get_routes(m_session, &ids, &addrs);
1215 if (count > 0) {
1216 for (int i = 0; i < count; ++i) {
1217 res.push_back(std::make_pair(ids[i], addrs[i]));
1221 if (ids)
1222 free(ids);
1224 if (addrs)
1225 free(addrs);
1227 return res;
1230 std::string session::request(struct dnet_id *id, struct sph *sph, bool lock)
1232 std::string ret_str;
1234 void *ret = NULL;
1235 int err;
1237 if (lock)
1238 err = dnet_send_cmd(m_session, id, sph, &ret);
1239 else
1240 err = dnet_send_cmd_nolock(m_session, id, sph, &ret);
1242 if (err < 0) {
1243 std::ostringstream str;
1245 str << dnet_dump_id(id) << ": failed to send request: " << strerror(-err) << ": " << err;
1246 throw std::runtime_error(str.str());
1249 if (ret && err) {
1250 try {
1251 ret_str.assign((char *)ret, err);
1252 } catch (...) {
1253 free(ret);
1254 throw;
1256 free(ret);
1259 return ret_str;
1262 std::string session::raw_exec(struct dnet_id *id, const struct sph *orig_sph,
1263 const std::string &event, const std::string &data, const std::string &binary, bool lock)
1265 std::vector<char> vec(event.size() + data.size() + binary.size() + sizeof(struct sph));
1266 std::string ret_str;
1268 struct sph *sph = (struct sph *)&vec[0];
1270 memset(sph, 0, sizeof(struct sph));
1271 if (orig_sph) {
1272 *sph = *orig_sph;
1273 sph->flags &= ~DNET_SPH_FLAGS_SRC_BLOCK;
1274 } else if (id) {
1275 sph->flags = DNET_SPH_FLAGS_SRC_BLOCK;
1276 memcpy(sph->src.id, id->id, sizeof(sph->src.id));
1279 sph->data_size = data.size();
1280 sph->binary_size = binary.size();
1281 sph->event_size = event.size();
1283 memcpy(sph->data, event.data(), event.size());
1284 memcpy(sph->data + event.size(), data.data(), data.size());
1285 memcpy(sph->data + event.size() + data.size(), binary.data(), binary.size());
1287 return request(id, sph, lock);
1290 std::string session::exec_locked(struct dnet_id *id, const std::string &event, const std::string &data, const std::string &binary)
1292 return raw_exec(id, NULL, event, data, binary, true);
1295 std::string session::exec_unlocked(struct dnet_id *id, const std::string &event, const std::string &data, const std::string &binary)
1297 return raw_exec(id, NULL, event, data, binary, false);
1300 std::string session::push_locked(struct dnet_id *id, const struct sph &sph, const std::string &event,
1301 const std::string &data, const std::string &binary)
1303 return raw_exec(id, &sph, event, data, binary, true);
1306 std::string session::push_unlocked(struct dnet_id *id, const struct sph &sph, const std::string &event,
1307 const std::string &data, const std::string &binary)
1309 return raw_exec(id, &sph, event, data, binary, false);
1312 void session::reply(const struct sph &orig_sph, const std::string &event, const std::string &data, const std::string &binary)
1314 std::vector<char> vec(event.size() + data.size() + binary.size() + sizeof(struct sph));
1315 std::string ret_str;
1317 struct sph *sph = (struct sph *)&vec[0];
1319 *sph = orig_sph;
1321 sph->data_size = data.size();
1322 sph->binary_size = binary.size();
1323 sph->event_size = event.size();
1325 memcpy(sph->data, event.data(), event.size());
1326 memcpy(sph->data + event.size(), data.data(), data.size());
1327 memcpy(sph->data + event.size() + data.size(), binary.data(), binary.size());
1329 struct dnet_id id;
1330 dnet_setup_id(&id, 0, sph->src.id);
1331 id.type = 0;
1333 request(&id, sph, false);
1336 namespace {
1337 bool dnet_io_attr_compare(const struct dnet_io_attr &io1, const struct dnet_io_attr &io2) {
1338 int cmp;
1340 cmp = dnet_id_cmp_str(io1.id, io2.id);
1341 return cmp < 0;
1345 std::vector<std::string> session::bulk_read(const std::vector<struct dnet_io_attr> &ios, uint64_t cflags)
1347 struct dnet_range_data *data;
1348 int num, *g, err;
1350 num = dnet_mix_states(m_session, NULL, &g);
1351 if (num < 0)
1352 throw std::runtime_error("could not fetch groups: " + std::string(strerror(num)));
1354 std::vector<int> groups;
1355 try {
1356 groups.assign(g, g + num);
1357 free(g);
1358 } catch (...) {
1359 free(g);
1360 throw;
1363 std::vector<struct dnet_io_attr> tmp_ios = ios;
1364 std::sort(tmp_ios.begin(), tmp_ios.end(), dnet_io_attr_compare);
1366 std::vector<std::string> ret;
1368 for (std::vector<int>::iterator group = groups.begin(); group != groups.end(); ++group) {
1369 if (!tmp_ios.size())
1370 break;
1372 data = dnet_bulk_read(m_session, (struct dnet_io_attr *)(&tmp_ios[0]), tmp_ios.size(), *group, cflags, &err);
1373 if (!data && err) {
1374 std::ostringstream str;
1375 str << "Failed to read bulk data: group: " << *group <<
1376 ": err: " << strerror(-err) << ": " << err;
1377 throw std::runtime_error(str.str());
1380 if (data) {
1381 for (int i = 0; i < err; ++i) {
1382 struct dnet_range_data *d = &data[i];
1383 char *data = (char *)d->data;
1385 while (d->size) {
1386 struct dnet_io_attr *io = (struct dnet_io_attr *)data;
1388 for (std::vector<struct dnet_io_attr>::iterator it = tmp_ios.begin(); it != tmp_ios.end(); ++it) {
1389 int cmp = dnet_id_cmp_str(it->id, io->id);
1391 if (cmp == 0) {
1392 tmp_ios.erase(it);
1393 break;
1397 dnet_convert_io_attr(io);
1399 uint64_t size = dnet_bswap64(io->size);
1401 std::string str;
1403 str.append((char *)io->id, DNET_ID_SIZE);
1404 str.append((char *)&size, 8);
1405 str.append((const char *)(io + 1), io->size);
1407 ret.push_back(str);
1409 data += sizeof(struct dnet_io_attr) + io->size;
1410 d->size -= sizeof(struct dnet_io_attr) + io->size;
1413 free(d->data);
1416 free(data);
1420 return ret;
1423 std::vector<std::string> session::bulk_read(const std::vector<std::string> &keys, uint64_t cflags)
1425 std::vector<struct dnet_io_attr> ios;
1426 struct dnet_io_attr io;
1427 memset(&io, 0, sizeof(io));
1429 ios.reserve(keys.size());
1431 for (size_t i = 0; i < keys.size(); ++i) {
1432 struct dnet_id id;
1434 transform(keys[i], id);
1435 memcpy(io.id, id.id, sizeof(io.id));
1436 ios.push_back(io);
1439 return bulk_read(ios, cflags);
1442 std::string session::bulk_write(const std::vector<struct dnet_io_attr> &ios, const std::vector<std::string> &data, uint64_t cflags)
1444 std::vector<struct dnet_io_control> ctls;
1445 unsigned int i;
1446 int err;
1448 if (ios.size() != data.size()) {
1449 std::ostringstream string;
1450 string << "BULK_WRITE: ios doesn't meet data: io.size: " << ios.size() << ", data.size: " << data.size();
1451 throw std::runtime_error(string.str());
1454 ctls.reserve(ios.size());
1456 for(i = 0; i < ios.size(); ++i) {
1457 struct dnet_io_control ctl;
1458 memset(&ctl, 0, sizeof(ctl));
1460 ctl.cflags = cflags;
1461 ctl.data = data[i].data();
1463 ctl.io = ios[i];
1465 dnet_setup_id(&ctl.id, 0, (unsigned char *)ios[i].id);
1466 ctl.id.type = ios[i].type;
1468 ctl.fd = -1;
1470 ctls.push_back(ctl);
1473 struct dnet_range_data ret = dnet_bulk_write(m_session, &ctls[0], ctls.size(), &err);
1474 if (err < 0) {
1475 std::ostringstream string;
1476 string << "BULK_WRITE: size: " << ret.size << ", err: " << err;
1477 throw std::runtime_error(string.str());
1480 std::string ret_str((const char *)ret.data, ret.size);
1481 free(ret.data);
1483 return ret_str;
1486 struct dnet_node * session::get_node()
1488 return m_node->m_node;