Only work with 0.9.4 cocaine
[elliptics.git] / bindings / cpp / node.cpp
blob0a5f055995a8e33b11639aa82df5192fb0e82b69
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #define _XOPEN_SOURCE 600
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/socket.h>
21 #include <sys/mman.h>
22 #include <sys/wait.h>
24 #include <errno.h>
25 #include <ctype.h>
26 #include <dirent.h>
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
34 #include <algorithm>
35 #include <iostream>
36 #include <stdexcept>
37 #include <string>
38 #include <sstream>
39 #include <vector>
41 #include <boost/algorithm/string.hpp>
42 #include <elliptics/cppdef.h>
44 using namespace ioremap::elliptics;
46 node::node(logger &l) : m_node(NULL), m_log(NULL)
48 struct dnet_config cfg;
50 memset(&cfg, 0, sizeof(cfg));
52 cfg.sock_type = SOCK_STREAM;
53 cfg.proto = IPPROTO_TCP;
54 cfg.wait_timeout = 5;
55 cfg.check_timeout = 20;
57 m_log = reinterpret_cast<logger *>(l.clone());
58 cfg.log = m_log->get_dnet_log();
60 snprintf(cfg.addr, sizeof(cfg.addr), "0.0.0.0");
61 snprintf(cfg.port, sizeof(cfg.port), "0");
63 m_node = dnet_node_create(&cfg);
64 if (!m_node) {
65 delete m_log;
66 throw std::bad_alloc();
70 node::node(logger &l, struct dnet_config &cfg) : m_node(NULL), m_log(NULL)
72 cfg.sock_type = SOCK_STREAM;
73 cfg.proto = IPPROTO_TCP;
75 m_log = reinterpret_cast<logger *>(l.clone());
76 cfg.log = m_log->get_dnet_log();
78 snprintf(cfg.addr, sizeof(cfg.addr), "0.0.0.0");
79 snprintf(cfg.port, sizeof(cfg.port), "0");
81 m_node = dnet_node_create(&cfg);
82 if (!m_node) {
83 delete m_log;
84 throw std::bad_alloc();
88 node::node(logger &l, const std::string &config_path) : m_node(NULL), m_log(NULL)
90 struct dnet_config cfg;
91 memset(&cfg, 0, sizeof(struct dnet_config));
93 cfg.sock_type = SOCK_STREAM;
94 cfg.proto = IPPROTO_TCP;
96 m_log = reinterpret_cast<logger *>(l.clone());
97 cfg.log = m_log->get_dnet_log();
99 std::list<addr_tuple> remotes;
100 std::vector<int> groups;
102 parse_config(config_path, cfg, remotes, groups, cfg.log->log_level);
104 m_node = dnet_node_create(&cfg);
105 if (!m_node) {
106 delete m_log;
107 throw std::bad_alloc();
110 add_groups(groups);
111 for (std::list<addr_tuple>::iterator it = remotes.begin(); it != remotes.end(); ++it) {
112 try {
113 add_remote(it->host.c_str(), it->port, it->family);
114 } catch (...) {
115 continue;
120 node::~node()
122 dnet_node_destroy(m_node);
123 delete m_log;
126 void node::parse_config(const std::string &path, struct dnet_config &cfg,
127 std::list<addr_tuple> &remotes,
128 std::vector<int> &groups,
129 int &log_level)
131 std::ifstream in(path.c_str());
132 std::string line;
133 int line_num = 0;
135 while (!in.eof() && in.good()) {
136 line.resize(1024);
138 in.getline((char *)line.data(), line.size());
139 size_t len = in.gcount();
141 line.resize(len);
143 if (in.eof() || !in.good())
144 break;
146 boost::trim(line);
147 line_num++;
149 if (line.size() < 3 || line.data()[0] == '#')
150 continue;
152 std::vector<std::string> strs;
153 boost::split(strs, line, boost::is_any_of("="));
155 std::string key = strs[0];
156 boost::trim(key);
158 if (strs.size() != 2) {
159 std::ostringstream str;
160 str << path << ": invalid elliptics config: line: " << line_num <<
161 ", key: " << key << "': string is broken: size: " << strs.size();
162 throw std::runtime_error(str.str());
164 std::string value = strs[1];
165 boost::trim(value);
167 if (key == "remote") {
168 std::vector<std::string> rem;
169 boost::split(rem, value, boost::is_any_of(" "));
171 for (std::vector<std::string>::iterator it = rem.begin(); it != rem.end(); ++it) {
172 std::string addr_str = *it;
173 if (dnet_parse_addr((char *)addr_str.c_str(), &cfg)) {
174 std::ostringstream str;
175 str << path << ": invalid elliptics config: '" << key << "' ";
176 str << path << ": invalid elliptics config: line: " << line_num <<
177 ", key: '" << key << "': remote addr is invalid";
178 throw std::runtime_error(str.str());
181 addr_tuple addr(cfg.addr, atoi(cfg.port), cfg.family);
182 remotes.push_back(addr);
186 if (key == "groups") {
187 std::vector<std::string> gr;
188 boost::split(gr, value, boost::is_any_of(":"));
190 for (std::vector<std::string>::iterator it = gr.begin(); it != gr.end(); ++it) {
191 int group = atoi(it->c_str());
193 if (group != 0)
194 groups.push_back(group);
198 if (key == "check_timeout")
199 cfg.check_timeout = strtoul(value.c_str(), NULL, 0);
200 if (key == "wait_timeout")
201 cfg.wait_timeout = strtoul(value.c_str(), NULL, 0);
202 if (key == "log_level")
203 log_level = strtoul(value.c_str(), NULL, 0);
207 void node::add_groups(std::vector<int> &groups)
209 if (dnet_node_set_groups(m_node, (int *)&groups[0], groups.size()))
210 throw std::bad_alloc();
211 this->groups = groups;
214 void node::add_remote(const char *addr, const int port, const int family)
216 struct dnet_config cfg;
217 int err;
219 memset(&cfg, 0, sizeof(cfg));
221 cfg.family = family;
222 snprintf(cfg.addr, sizeof(cfg.addr), "%s", addr);
223 snprintf(cfg.port, sizeof(cfg.port), "%d", port);
225 err = dnet_add_state(m_node, &cfg);
226 if (err) {
227 std::ostringstream str;
228 str << "Failed to add remote addr " << addr << ":" << port << ": " << err;
229 throw std::runtime_error(str.str());
233 void node::read_file(struct dnet_id &id, const std::string &file, uint64_t offset, uint64_t size)
235 int err;
237 err = dnet_read_file_id(m_node, file.c_str(), &id, offset, size);
238 if (err) {
239 std::ostringstream str;
240 str << dnet_dump_id(&id) << ": READ: " << file << ": offset: " << offset << ", size: " << size << ": " << err;
241 throw std::runtime_error(str.str());
245 void node::read_file(const std::string &remote, const std::string &file, uint64_t offset, uint64_t size, int type)
247 int err;
249 err = dnet_read_file(m_node, file.c_str(), remote.data(), remote.size(), offset, size, type);
250 if (err) {
251 struct dnet_id id;
252 transform(remote, id);
253 id.type = 0;
255 std::ostringstream str;
256 str << dnet_dump_id(&id) << ": READ: " << file << ": offset: " << offset << ", size: " << size << ": " << err;
257 throw std::runtime_error(str.str());
261 void node::write_file(struct dnet_id &id, const std::string &file, uint64_t local_offset,
262 uint64_t offset, uint64_t size, uint64_t cflags, unsigned int ioflags)
264 int err = dnet_write_file_id(m_node, file.c_str(), &id, local_offset, offset, size, cflags, ioflags);
265 if (err) {
266 std::ostringstream str;
267 str << dnet_dump_id(&id) << ": WRITE: " << file << ", local_offset: " << local_offset <<
268 ", offset: " << offset << ", size: " << size << ": " << err;
269 throw std::runtime_error(str.str());
272 void node::write_file(const std::string &remote, const std::string &file, uint64_t local_offset, uint64_t offset, uint64_t size,
273 uint64_t cflags, unsigned int ioflags, int type)
275 int err = dnet_write_file(m_node, file.c_str(), remote.data(), remote.size(),
276 local_offset, offset, size, cflags, ioflags, type);
277 if (err) {
278 struct dnet_id id;
279 transform(remote, id);
280 id.type = 0;
282 std::ostringstream str;
283 str << dnet_dump_id(&id) << ": WRITE: " << file << ", local_offset: " << local_offset <<
284 ", offset: " << offset << ", size: " << size << ": " << err;
285 throw std::runtime_error(str.str());
289 std::string node::read_data_wait(struct dnet_id &id, uint64_t offset, uint64_t size,
290 uint64_t cflags, uint32_t ioflags)
292 struct dnet_io_attr io;
293 int err;
295 memset(&io, 0, sizeof(io));
296 io.size = size;
297 io.offset = offset;
298 io.flags = ioflags;
299 io.type = id.type;
301 memcpy(io.id, id.id, DNET_ID_SIZE);
302 memcpy(io.parent, id.id, DNET_ID_SIZE);
304 void *data = dnet_read_data_wait(m_node, &id, &io, cflags, &err);
305 if (!data) {
306 std::ostringstream str;
307 str << dnet_dump_id(&id) << ": READ: size: " << size << ": err: " << strerror(-err) << ": " << err;
308 throw std::runtime_error(str.str());
311 std::string ret = std::string((const char *)data + sizeof(struct dnet_io_attr), io.size - sizeof(struct dnet_io_attr));
312 free(data);
314 return ret;
317 std::string node::read_data_wait(const std::string &remote, uint64_t offset, uint64_t size,
318 uint64_t cflags, uint32_t ioflags, int type)
320 struct dnet_id id;
322 transform(remote, id);
323 id.type = type;
325 return read_data_wait(id, offset, size, cflags, ioflags);
328 void node::prepare_latest(struct dnet_id &id, uint64_t cflags, std::vector<int> &groups)
330 struct dnet_read_latest_prepare pr;
331 int err;
333 memset(&pr, 0, sizeof(struct dnet_read_latest_prepare));
335 pr.n = m_node;
336 pr.id = id;
337 pr.cflags = cflags;
339 pr.group = (int *)malloc(groups.size() * sizeof(int));
340 if (!pr.group) {
341 std::ostringstream str;
343 str << dnet_dump_id(&id) << ": prepare_latest: allocation failure: group num: " << groups.size();
344 throw std::runtime_error(str.str());
346 pr.group_num = groups.size();
348 for (unsigned i = 0; i < groups.size(); ++i)
349 pr.group[i] = groups[i];
351 err = dnet_read_latest_prepare(&pr);
352 if (!err) {
353 try {
354 groups.clear();
356 for (int i = 0; i < pr.group_num; ++i)
357 groups.push_back(pr.group[i]);
358 } catch (...) {
359 free(pr.group);
360 throw;
364 free(pr.group);
366 if (!groups.size())
367 err = -ENOENT;
369 if (err) {
370 std::ostringstream str;
372 str << dnet_dump_id(&id) << ": prepare_latest: groups: " << groups.size() << ": err: " << strerror(-err) << ": " << err;
373 throw std::runtime_error(str.str());
377 std::string node::read_latest(struct dnet_id &id, uint64_t offset, uint64_t size,
378 uint64_t cflags, uint32_t ioflags)
380 struct dnet_io_attr io;
381 void *data;
382 int err;
384 memset(&io, 0, sizeof(io));
385 io.size = size;
386 io.offset = offset;
387 io.flags = ioflags;
388 io.type = id.type;
389 io.num = groups.size();
391 memcpy(io.id, id.id, DNET_ID_SIZE);
392 memcpy(io.parent, id.id, DNET_ID_SIZE);
394 err = dnet_read_latest(m_node, &id, &io, cflags, &data);
395 if (err < 0) {
396 std::ostringstream str;
397 str << dnet_dump_id(&id) << ": READ: size: " << size << ": err: " << strerror(-err) << ": " << err;
398 throw std::runtime_error(str.str());
401 std::string ret = std::string((const char *)data + sizeof(struct dnet_io_attr), io.size - sizeof(struct dnet_io_attr));
402 free(data);
404 return ret;
407 std::string node::read_latest(const std::string &remote, uint64_t offset, uint64_t size,
408 uint64_t cflags, uint32_t ioflags, int type)
410 struct dnet_id id;
412 transform(remote, id);
413 id.type = type;
415 return read_latest(id, offset, size, cflags, ioflags);
418 std::string node::write_cache(struct dnet_id &id, const std::string &str,
419 uint64_t cflags, unsigned int ioflags, long timeout)
421 struct dnet_io_control ctl;
423 memset(&ctl, 0, sizeof(ctl));
425 ctl.cflags = cflags;
426 ctl.data = str.data();
428 ctl.io.flags = ioflags | DNET_IO_FLAGS_CACHE;
429 ctl.io.start = timeout;
430 ctl.io.size = str.size();
431 ctl.io.type = id.type;
432 ctl.io.num = str.size();
434 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
436 ctl.fd = -1;
438 char *result = NULL;
439 int err = dnet_write_data_wait(m_node, &ctl, (void **)&result);
440 if (err < 0) {
441 std::ostringstream string;
442 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
443 throw std::runtime_error(string.str());
446 std::string ret((const char *)result, err);
447 free(result);
449 return ret;
452 std::string node::write_cache(const std::string &key, const std::string &str,
453 uint64_t cflags, unsigned int ioflags, long timeout)
455 struct dnet_id id;
457 transform(key, id);
458 id.type = 0;
459 id.group_id = 0;
461 return write_cache(id, str, cflags, ioflags, timeout);
464 std::string node::write_data_wait(struct dnet_id &id, const std::string &str,
465 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags)
467 struct dnet_io_control ctl;
469 memset(&ctl, 0, sizeof(ctl));
471 ctl.cflags = cflags;
472 ctl.data = str.data();
474 ctl.io.flags = ioflags;
475 ctl.io.offset = remote_offset;
476 ctl.io.size = str.size();
477 ctl.io.type = id.type;
478 ctl.io.num = str.size() + remote_offset;
480 memcpy(&ctl.id, &id, sizeof(struct dnet_id));
482 ctl.fd = -1;
484 char *result = NULL;
485 int err = dnet_write_data_wait(m_node, &ctl, (void **)&result);
486 if (err < 0) {
487 std::ostringstream string;
488 string << dnet_dump_id(&id) << ": WRITE: size: " << str.size() << ", err: " << err;
489 throw std::runtime_error(string.str());
492 std::string ret((const char *)result, err);
493 free(result);
495 return ret;
498 std::string node::write_data_wait(const std::string &remote, const std::string &str,
499 uint64_t remote_offset, uint64_t cflags, unsigned int ioflags, int type)
501 struct dnet_id id;
503 transform(remote, id);
504 id.type = type;
505 id.group_id = 0;
507 return write_data_wait(id, str, remote_offset, cflags, ioflags);
510 std::string node::lookup_addr(const std::string &remote, const int group_id)
512 char buf[128];
514 int err = dnet_lookup_addr(m_node, remote.data(), remote.size(), NULL, group_id, buf, sizeof(buf));
515 if (err < 0) {
516 std::ostringstream str;
517 str << "Failed to lookup in group " << group_id << ": key size: " << remote.size() << ", err: " << err;
518 throw std::runtime_error(str.str());
521 return std::string((const char *)buf, strlen(buf));
524 std::string node::lookup_addr(const struct dnet_id &id)
526 char buf[128];
528 int err = dnet_lookup_addr(m_node, NULL, 0, (struct dnet_id *)&id, id.group_id, buf, sizeof(buf));
529 if (err < 0) {
530 std::ostringstream str;
531 str << "Failed to lookup " << dnet_dump_id(&id) << ": err: " << err;
532 throw std::runtime_error(str.str());
535 return std::string((const char *)buf, strlen(buf));
538 std::string node::create_metadata(const struct dnet_id &id, const std::string &obj,
539 const std::vector<int> &groups, const struct timespec &ts)
541 struct dnet_metadata_control ctl;
542 struct dnet_meta_container mc;
543 int err;
545 memset(&mc, 0, sizeof(struct dnet_meta_container));
546 memset(&ctl, 0, sizeof(struct dnet_metadata_control));
548 ctl.obj = (char *)obj.data();
549 ctl.len = obj.size();
551 ctl.groups = (int *)&groups[0];
552 ctl.group_num = groups.size();
554 ctl.ts = ts;
555 ctl.id = id;
557 err = dnet_create_metadata(m_node, &ctl, &mc);
558 if (err) {
559 std::ostringstream str;
560 str << "Failed to create metadata: key: " << dnet_dump_id(&id) << ", err: " << err;
561 throw std::runtime_error(str.str());
564 std::string ret;
566 try {
567 ret.assign((char *)mc.data, mc.size);
568 } catch (...) {
569 free(mc.data);
570 throw;
573 free(mc.data);
574 return ret;
577 int node::write_metadata(const struct dnet_id &id, const std::string &obj,
578 const std::vector<int> &groups, const struct timespec &ts, uint64_t cflags)
580 int err;
581 std::string meta;
582 struct dnet_meta_container mc;
584 if (dnet_flags(m_node) & DNET_CFG_NO_META)
585 return 0;
587 meta = create_metadata(id, obj, groups, ts);
589 mc.data = (void *)meta.data();
590 mc.size = meta.size();
592 mc.id = id;
594 err = dnet_write_metadata(m_node, &mc, 1, cflags);
595 if (err) {
596 std::ostringstream str;
597 str << "Failed to write metadata: key: " << dnet_dump_id(&id) << ", err: " << err;
598 throw std::runtime_error(str.str());
601 return 0;
604 void node::transform(const std::string &data, struct dnet_id &id)
606 dnet_transform(m_node, (void *)data.data(), data.size(), &id);
609 void node::lookup(const struct dnet_id &id, const callback &c)
611 int err = dnet_lookup_object(m_node, (struct dnet_id *)&id, 0,
612 callback::complete_callback,
613 (void *)&c);
615 if (err) {
616 std::ostringstream str;
617 str << "Failed to lookup ID " << dnet_dump_id(&id) << ": " << err;
618 throw std::runtime_error(str.str());
622 void node::lookup(const std::string &data, const callback &c)
624 struct dnet_id id;
625 int error = -ENOENT, i, num, *g;
627 transform(data, id);
628 id.type = 0;
630 num = dnet_mix_states(m_node, &id, &g);
631 if (num < 0)
632 throw std::bad_alloc();
634 for (i=0; i<num; ++i) {
635 id.group_id = g[i];
637 try {
638 lookup(id, c);
639 } catch (...) {
640 continue;
643 error = 0;
644 break;
647 free(g);
649 if (error) {
650 std::ostringstream str;
651 str << "Failed to lookup data object: key: " << dnet_dump_id(&id);
652 throw std::runtime_error(str.str());
656 std::string node::lookup(const std::string &data)
658 struct dnet_id id;
659 int error = -ENOENT, i, num, *g;
660 std::string ret;
662 transform(data, id);
663 id.type = 0;
665 num = dnet_mix_states(m_node, &id, &g);
666 if (num < 0)
667 throw std::bad_alloc();
669 for (i=0; i<num; ++i) {
670 try {
671 callback l;
672 id.group_id = g[i];
674 lookup(id, l);
675 ret = l.wait();
677 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd)) {
678 std::stringstream str;
680 str << dnet_dump_id(&id) << ": failed to receive lookup request";
681 throw std::runtime_error(str.str());
683 /* reply parsing examaple */
684 #if 0
685 struct dnet_addr *addr = (struct dnet_addr *)ret.data();
686 struct dnet_cmd *cmd = (struct dnet_cmd *)(addr + 1);
688 if (cmd->size > sizeof(struct dnet_addr_attr)) {
689 struct dnet_addr_attr *a = (struct dnet_addr_attr *)(cmd + 1);
690 struct dnet_file_info *info = (struct dnet_file_info *)(a + 1);
692 dnet_convert_addr_attr(a);
693 dnet_convert_file_info(info);
695 #endif
696 dnet_log_raw(m_node, DNET_LOG_DEBUG, "%s: %s: %zu bytes\n", dnet_dump_id(&id), data.c_str(), ret.size());
697 error = 0;
698 break;
699 } catch (const std::exception &e) {
700 dnet_log_raw(m_node, DNET_LOG_ERROR, "%s: %s : %s\n", dnet_dump_id(&id), e.what(), data.c_str());
701 continue;
705 free(g);
707 if (error) {
708 std::ostringstream str;
709 str << data << ": could not find object";
711 throw std::runtime_error(str.str());
714 return ret;
717 std::string node::lookup(const struct dnet_id &id)
719 int error = -ENOENT;
720 std::string ret;
722 try {
723 callback l;
725 lookup(id, l);
726 ret = l.wait();
728 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd)) {
729 std::stringstream str;
731 str << dnet_dump_id(&id) << ": failed to receive lookup request";
732 throw std::runtime_error(str.str());
735 dnet_log_raw(m_node, DNET_LOG_DEBUG, "%s: %zu bytes\n", dnet_dump_id(&id), ret.size());
736 error = 0;
737 } catch (const std::exception &e) {
738 dnet_log_raw(m_node, DNET_LOG_ERROR, "%s: %s\n", dnet_dump_id(&id), e.what());
741 if (error) {
742 std::ostringstream str;
743 str << dnet_dump_id(&id) << ": could not find object";
745 throw std::runtime_error(str.str());
748 return ret;
751 void node::remove_raw(struct dnet_id &id, uint64_t cflags, uint64_t ioflags)
753 int err = -ENOENT;
754 std::vector<int> g = groups;
756 for (int i=0; i<(int)g.size(); ++i) {
757 id.group_id = g[i];
759 if (!dnet_remove_object_now(m_node, &id, cflags, ioflags))
760 err = 0;
763 if (err) {
764 std::ostringstream str;
765 str << dnet_dump_id(&id) << ": REMOVE: " << err;
766 throw std::runtime_error(str.str());
770 void node::remove(struct dnet_id &id)
772 remove_raw(id, 0, 0);
775 void node::remove_raw(const std::string &data, int type, uint64_t cflags, uint64_t ioflags)
777 struct dnet_id id;
779 transform(data, id);
780 id.type = type;
782 remove_raw(id, cflags, ioflags);
785 void node::remove(const std::string &data, int type)
787 remove_raw(data, type, 0, 0);
790 std::string node::stat_log()
792 callback c;
793 std::string ret;
794 int err;
796 err = dnet_request_stat(m_node, NULL, DNET_CMD_STAT, 0,
797 callback::complete_callback, (void *)&c);
798 if (err < 0) {
799 std::ostringstream str;
800 str << "Failed to request statistics: " << err;
801 throw std::runtime_error(str.str());
804 ret = c.wait(err);
806 /* example reply parsing */
807 #if 0
808 float la[3];
809 const void *data = ret.data();
810 int size = ret.size();
811 char id_str[DNET_ID_SIZE*2 + 1];
812 char addr_str[128];
814 while (size) {
815 struct dnet_addr *addr = (struct dnet_addr *)data;
816 struct dnet_cmd *cmd = (struct dnet_cmd *)(addr + 1);
817 struct dnet_stat *st = (struct dnet_stat *)(cmd + 1);
819 dnet_convert_stat(st);
821 la[0] = (float)st->la[0] / 100.0;
822 la[1] = (float)st->la[1] / 100.0;
823 la[2] = (float)st->la[2] / 100.0;
825 printf("<stat addr=\"%s\" id=\"%s\"><la>%.2f %.2f %.2f</la>"
826 "<memtotal>%llu KB</memtotal><memfree>%llu KB</memfree><memcached>%llu KB</memcached>"
827 "<storage_size>%llu MB</storage_size><available_size>%llu MB</available_size>"
828 "<files>%llu</files><fsid>0x%llx</fsid></stat>",
829 dnet_server_convert_dnet_addr_raw(addr, addr_str, sizeof(addr_str)),
830 dnet_dump_id_len_raw(cmd->id.id, DNET_ID_SIZE, id_str),
831 la[0], la[1], la[2],
832 (unsigned long long)st->vm_total,
833 (unsigned long long)st->vm_free,
834 (unsigned long long)st->vm_cached,
835 (unsigned long long)(st->frsize * st->blocks / 1024 / 1024),
836 (unsigned long long)(st->bavail * st->bsize / 1024 / 1024),
837 (unsigned long long)st->files, (unsigned long long)st->fsid);
838 printf("\n");
840 int sz = sizeof(*addr) + sizeof(*cmd) + cmd->size;
842 size -= sz;
843 data += sz;
845 #endif
847 if (ret.size() < sizeof(struct dnet_addr) + sizeof(struct dnet_cmd) + sizeof(struct dnet_stat))
848 throw std::runtime_error("Failed to request statistics: not enough data returned");
849 return ret;
852 int node::state_num(void)
854 return dnet_state_num(m_node);
857 int node::request_cmd(struct dnet_trans_control &ctl)
859 int err;
861 err = dnet_request_cmd(m_node, &ctl);
862 if (err < 0) {
863 std::ostringstream str;
864 str << dnet_dump_id(&ctl.id) << ": failed to request cmd: " << dnet_cmd_string(ctl.cmd) << ": " << err;
865 throw std::runtime_error(str.str());
868 return err;
871 void node::update_status(const char *saddr, const int port, const int family, struct dnet_node_status *status)
873 int err;
874 struct dnet_addr addr;
875 char sport[16];
877 memset(&addr, 0, sizeof(addr));
878 addr.addr_len = sizeof(addr.addr);
880 snprintf(sport, sizeof(sport), "%d", port);
882 err = dnet_fill_addr(&addr, saddr, sport, family, SOCK_STREAM, IPPROTO_TCP);
883 if (!err)
884 err = dnet_update_status(m_node, &addr, NULL, status);
886 if (err < 0) {
887 std::ostringstream str;
888 str << saddr << ":" << port << ": failed to request set status " << std::hex << status << ": " << err;
889 throw std::runtime_error(str.str());
893 void node::update_status(struct dnet_id &id, struct dnet_node_status *status)
895 int err;
897 err = dnet_update_status(m_node, NULL, &id, status);
898 if (err < 0) {
899 std::ostringstream str;
901 str << dnet_dump_id(&id) << ": failed to request set status " << std::hex << status << ": " << err;
902 throw std::runtime_error(str.str());
906 struct range_sort_compare {
907 bool operator () (const std::string &s1, const std::string &s2) {
908 unsigned char *id1 = (unsigned char *)s1.data();
909 unsigned char *id2 = (unsigned char *)s2.data();
911 int cmp = dnet_id_cmp_str(id1, id2);
913 return cmp < 0;
917 std::vector<std::string> node::read_data_range(struct dnet_io_attr &io, int group_id, uint64_t cflags)
919 struct dnet_range_data *data;
920 uint64_t num = 0;
921 uint32_t ioflags = io.flags;
922 int err;
924 data = dnet_read_range(m_node, &io, group_id, cflags, &err);
925 if (!data && err) {
926 std::ostringstream str;
927 str << "Failed to read range data object: group: " << group_id <<
928 ", key: " << dnet_dump_id_str(io.id) <<
929 ", size: " << io.size << ": err: " << strerror(-err) << ": " << err;
930 throw std::runtime_error(str.str());
933 std::vector<std::string> ret;
935 if (data) {
936 for (int i = 0; i < err; ++i) {
937 struct dnet_range_data *d = &data[i];
938 char *data = (char *)d->data;
940 if (!(ioflags & DNET_IO_FLAGS_NODATA)) {
941 while (d->size) {
942 struct dnet_io_attr *io = (struct dnet_io_attr *)data;
944 dnet_convert_io_attr(io);
946 uint64_t size = dnet_bswap64(io->size);
948 std::string str;
950 str.append((char *)io->id, DNET_ID_SIZE);
951 str.append((char *)&size, 8);
952 str.append((const char *)(io + 1), io->size);
954 ret.push_back(str);
956 data += sizeof(struct dnet_io_attr) + io->size;
957 d->size -= sizeof(struct dnet_io_attr) + io->size;
959 } else {
960 if (d->size != sizeof(struct dnet_io_attr)) {
961 std::ostringstream str;
962 str << "Incorrect data size: d->size = " << d->size <<
963 "sizeof = " << sizeof(struct dnet_io_attr);
964 throw std::runtime_error(str.str());
966 struct dnet_io_attr *rep = (struct dnet_io_attr *)data;
967 num += rep->num;
970 free(d->data);
973 free(data);
975 if (ioflags & DNET_IO_FLAGS_NODATA) {
976 std::ostringstream str;
977 str << num;
978 ret.push_back(str.str());
982 return ret;
985 std::vector<struct dnet_io_attr> node::remove_data_range(struct dnet_io_attr &io, int group_id, uint64_t cflags)
987 struct dnet_io_attr *retp;
988 int ret_num;
989 int err;
991 retp = dnet_remove_range(m_node, &io, group_id, cflags, &ret_num, &err);
993 if (!retp && err) {
994 std::ostringstream str;
995 str << "Failed to read range data object: group: " << group_id <<
996 ", key: " << dnet_dump_id_str(io.id) <<
997 ", size: " << io.size << ": err: " << strerror(-err) << ": " << err;
998 throw std::runtime_error(str.str());
1001 std::vector<struct dnet_io_attr> ret;;
1003 if (retp) {
1004 for (int i = 0; i < ret_num; ++i) {
1005 ret.push_back(retp[i]);
1008 free(retp);
1011 return ret;
1014 std::string node::write_prepare(const std::string &remote, const std::string &str, uint64_t remote_offset,
1015 uint64_t psize, uint64_t cflags, unsigned int ioflags, int type)
1017 struct dnet_io_control ctl;
1019 memset(&ctl, 0, sizeof(ctl));
1021 ctl.cflags = cflags;
1022 ctl.data = str.data();
1024 ctl.io.flags = ioflags | DNET_IO_FLAGS_PREPARE | DNET_IO_FLAGS_PLAIN_WRITE;
1025 ctl.io.offset = remote_offset;
1026 ctl.io.size = str.size();
1027 ctl.io.type = type;
1028 ctl.io.num = psize;
1030 transform(remote, ctl.id);
1031 ctl.id.type = type;
1032 ctl.id.group_id = 0;
1034 ctl.fd = -1;
1036 char *result = NULL;
1037 int err = dnet_write_data_wait(m_node, &ctl, (void **)&result);
1038 if (err < 0) {
1039 std::ostringstream string;
1040 string << dnet_dump_id(&ctl.id) << ": " << remote << ": write_prepare: size: " << str.size() << ", err: " << err;
1041 throw std::runtime_error(string.str());
1044 std::string ret(result, err);
1045 free(result);
1047 return ret;
1050 std::string node::write_commit(const std::string &remote, const std::string &str, uint64_t remote_offset, uint64_t csize,
1051 uint64_t cflags, unsigned int ioflags, int type)
1053 struct dnet_io_control ctl;
1055 memset(&ctl, 0, sizeof(ctl));
1057 ctl.cflags = cflags;
1058 ctl.data = str.data();
1060 ctl.io.flags = ioflags | DNET_IO_FLAGS_COMMIT | DNET_IO_FLAGS_PLAIN_WRITE;
1061 ctl.io.offset = remote_offset;
1062 ctl.io.size = str.size();
1063 ctl.io.type = type;
1064 ctl.io.num = csize;
1066 transform(remote, ctl.id);
1067 ctl.id.type = type;
1068 ctl.id.group_id = 0;
1070 ctl.fd = -1;
1072 char *result = NULL;
1073 int err = dnet_write_data_wait(m_node, &ctl, (void **)&result);
1074 if (err < 0) {
1075 std::ostringstream string;
1076 string << dnet_dump_id(&ctl.id) << ": " << remote << ": write_commit: size: " << str.size() << ", err: " << err;
1077 throw std::runtime_error(string.str());
1080 std::string ret(result, err);
1081 free(result);
1083 return ret;
1086 std::string node::write_plain(const std::string &remote, const std::string &str, uint64_t remote_offset,
1087 uint64_t cflags, unsigned int ioflags, int type)
1089 struct dnet_io_control ctl;
1091 memset(&ctl, 0, sizeof(ctl));
1093 ctl.cflags = cflags;
1094 ctl.data = str.data();
1096 ctl.io.flags = ioflags | DNET_IO_FLAGS_PLAIN_WRITE;
1097 ctl.io.offset = remote_offset;
1098 ctl.io.size = str.size();
1099 ctl.io.type = type;
1101 transform(remote, ctl.id);
1102 ctl.id.type = type;
1103 ctl.id.group_id = 0;
1105 ctl.fd = -1;
1107 char *result = NULL;
1108 int err = dnet_write_data_wait(m_node, &ctl, (void **)&result);
1109 if (err < 0) {
1110 std::ostringstream string;
1111 string << dnet_dump_id(&ctl.id) << ": " << remote << ": write_plain: size: " << str.size() << ", err: " << err;
1112 throw std::runtime_error(string.str());
1115 std::string ret(result, err);
1116 free(result);
1118 return ret;
1121 std::vector<std::pair<struct dnet_id, struct dnet_addr> > node::get_routes()
1123 std::vector<std::pair<struct dnet_id, struct dnet_addr> > res;
1124 struct dnet_id *ids = NULL;
1125 struct dnet_addr *addrs = NULL;
1127 int count = 0;
1129 count = dnet_get_routes(m_node, &ids, &addrs);
1131 if (count > 0) {
1132 for (int i = 0; i < count; ++i) {
1133 res.push_back(std::make_pair(ids[i], addrs[i]));
1137 if (ids)
1138 free(ids);
1140 if (addrs)
1141 free(addrs);
1143 return res;
1146 std::string node::request(struct dnet_id *id, struct sph *sph, bool lock)
1148 std::string ret_str;
1150 void *ret = NULL;
1151 int err;
1153 if (lock)
1154 err = dnet_send_cmd(m_node, id, sph, &ret);
1155 else
1156 err = dnet_send_cmd_nolock(m_node, id, sph, &ret);
1158 if (err < 0) {
1159 std::ostringstream str;
1161 str << dnet_dump_id(id) << ": failed to send request: " << strerror(-err) << ": " << err;
1162 throw std::runtime_error(str.str());
1165 if (ret && err) {
1166 try {
1167 ret_str.assign((char *)ret, err);
1168 } catch (...) {
1169 free(ret);
1170 throw;
1172 free(ret);
1175 return ret_str;
1178 std::string node::raw_exec(struct dnet_id *id, const struct sph *orig_sph,
1179 const std::string &event, const std::string &data, const std::string &binary, bool lock)
1181 std::vector<char> vec(event.size() + data.size() + binary.size() + sizeof(struct sph));
1182 std::string ret_str;
1184 struct sph *sph = (struct sph *)&vec[0];
1186 memset(sph, 0, sizeof(struct sph));
1187 if (orig_sph) {
1188 *sph = *orig_sph;
1189 sph->flags &= ~DNET_SPH_FLAGS_SRC_BLOCK;
1190 } else if (id) {
1191 sph->flags = DNET_SPH_FLAGS_SRC_BLOCK;
1192 memcpy(sph->src.id, id->id, sizeof(sph->src.id));
1195 sph->data_size = data.size();
1196 sph->binary_size = binary.size();
1197 sph->event_size = event.size();
1199 memcpy(sph->data, event.data(), event.size());
1200 memcpy(sph->data + event.size(), data.data(), data.size());
1201 memcpy(sph->data + event.size() + data.size(), binary.data(), binary.size());
1203 return request(id, sph, lock);
1206 std::string node::exec_locked(struct dnet_id *id, const std::string &event, const std::string &data, const std::string &binary)
1208 return raw_exec(id, NULL, event, data, binary, true);
1211 std::string node::exec_unlocked(struct dnet_id *id, const std::string &event, const std::string &data, const std::string &binary)
1213 return raw_exec(id, NULL, event, data, binary, false);
1216 std::string node::push_locked(struct dnet_id *id, const struct sph &sph, const std::string &event,
1217 const std::string &data, const std::string &binary)
1219 return raw_exec(id, &sph, event, data, binary, true);
1222 std::string node::push_unlocked(struct dnet_id *id, const struct sph &sph, const std::string &event,
1223 const std::string &data, const std::string &binary)
1225 return raw_exec(id, &sph, event, data, binary, false);
1228 void node::reply(const struct sph &orig_sph, const std::string &event, const std::string &data, const std::string &binary)
1230 std::vector<char> vec(event.size() + data.size() + binary.size() + sizeof(struct sph));
1231 std::string ret_str;
1233 struct sph *sph = (struct sph *)&vec[0];
1235 *sph = orig_sph;
1237 sph->data_size = data.size();
1238 sph->binary_size = binary.size();
1239 sph->event_size = event.size();
1241 memcpy(sph->data, event.data(), event.size());
1242 memcpy(sph->data + event.size(), data.data(), data.size());
1243 memcpy(sph->data + event.size() + data.size(), binary.data(), binary.size());
1245 struct dnet_id id;
1246 dnet_setup_id(&id, 0, sph->src.id);
1247 id.type = 0;
1249 request(&id, sph, false);
1252 namespace {
1253 bool dnet_io_attr_compare(const struct dnet_io_attr &io1, const struct dnet_io_attr &io2) {
1254 int cmp;
1256 cmp = dnet_id_cmp_str(io1.id, io2.id);
1257 return cmp < 0;
1261 std::vector<std::string> node::bulk_read(const std::vector<struct dnet_io_attr> &ios, uint64_t cflags)
1263 struct dnet_range_data *data;
1264 int num, *g, err;
1266 num = dnet_mix_states(m_node, NULL, &g);
1267 if (num < 0)
1268 throw std::runtime_error("could not fetch groups: " + std::string(strerror(num)));
1270 std::vector<int> groups;
1271 try {
1272 groups.assign(g, g + num);
1273 free(g);
1274 } catch (...) {
1275 free(g);
1276 throw;
1279 std::vector<struct dnet_io_attr> tmp_ios = ios;
1280 std::sort(tmp_ios.begin(), tmp_ios.end(), dnet_io_attr_compare);
1282 std::vector<std::string> ret;
1284 for (std::vector<int>::iterator group = groups.begin(); group != groups.end(); ++group) {
1285 if (!tmp_ios.size())
1286 break;
1288 data = dnet_bulk_read(m_node, (struct dnet_io_attr *)(&tmp_ios[0]), tmp_ios.size(), *group, cflags, &err);
1289 if (!data && err) {
1290 std::ostringstream str;
1291 str << "Failed to read bulk data: group: " << *group <<
1292 ": err: " << strerror(-err) << ": " << err;
1293 throw std::runtime_error(str.str());
1296 if (data) {
1297 for (int i = 0; i < err; ++i) {
1298 struct dnet_range_data *d = &data[i];
1299 char *data = (char *)d->data;
1301 while (d->size) {
1302 struct dnet_io_attr *io = (struct dnet_io_attr *)data;
1304 for (std::vector<struct dnet_io_attr>::iterator it = tmp_ios.begin(); it != tmp_ios.end(); ++it) {
1305 int cmp = dnet_id_cmp_str(it->id, io->id);
1307 if (cmp == 0) {
1308 tmp_ios.erase(it);
1309 break;
1313 dnet_convert_io_attr(io);
1315 uint64_t size = dnet_bswap64(io->size);
1317 std::string str;
1319 str.append((char *)io->id, DNET_ID_SIZE);
1320 str.append((char *)&size, 8);
1321 str.append((const char *)(io + 1), io->size);
1323 ret.push_back(str);
1325 data += sizeof(struct dnet_io_attr) + io->size;
1326 d->size -= sizeof(struct dnet_io_attr) + io->size;
1329 free(d->data);
1332 free(data);
1336 return ret;
1339 std::vector<std::string> node::bulk_read(const std::vector<std::string> &keys, uint64_t cflags)
1341 std::vector<struct dnet_io_attr> ios;
1342 struct dnet_io_attr io;
1343 memset(&io, 0, sizeof(io));
1345 ios.reserve(keys.size());
1347 for (size_t i = 0; i < keys.size(); ++i) {
1348 struct dnet_id id;
1350 transform(keys[i], id);
1351 memcpy(io.id, id.id, sizeof(io.id));
1352 ios.push_back(io);
1355 return bulk_read(ios, cflags);
1358 std::string node::bulk_write(const std::vector<struct dnet_io_attr> &ios, const std::vector<std::string> &data, uint64_t cflags)
1360 std::vector<struct dnet_io_control> ctls;
1361 unsigned int i;
1362 int err;
1364 if (ios.size() != data.size()) {
1365 std::ostringstream string;
1366 string << "BULK_WRITE: ios doesn't meet data: io.size: " << ios.size() << ", data.size: " << data.size();
1367 throw std::runtime_error(string.str());
1370 ctls.reserve(ios.size());
1372 for(i = 0; i < ios.size(); ++i) {
1373 struct dnet_io_control ctl;
1374 memset(&ctl, 0, sizeof(ctl));
1376 ctl.cflags = cflags;
1377 ctl.data = data[i].data();
1379 ctl.io = ios[i];
1381 dnet_setup_id(&ctl.id, 0, (unsigned char *)ios[i].id);
1382 ctl.id.type = ios[i].type;
1384 ctl.fd = -1;
1386 ctls.push_back(ctl);
1389 struct dnet_range_data ret = dnet_bulk_write(m_node, &ctls[0], ctls.size(), &err);
1390 if (err < 0) {
1391 std::ostringstream string;
1392 string << "BULK_WRITE: size: " << ret.size << ", err: " << err;
1393 throw std::runtime_error(string.str());
1396 std::string ret_str((const char *)ret.data, ret.size);
1397 free(ret.data);
1399 return ret_str;
1402 void node::set_timeouts(const int wait_timeout, const int check_timeout)
1404 dnet_set_timeouts(m_node, wait_timeout, check_timeout);