2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #define _XOPEN_SOURCE 600
18 #include <sys/types.h>
20 #include <sys/socket.h>
41 #include <boost/algorithm/string.hpp>
42 #include <elliptics/cppdef.h>
44 using namespace ioremap::elliptics
;
46 node::node(logger
&l
) : m_node(NULL
), m_log(NULL
)
48 struct dnet_config cfg
;
50 memset(&cfg
, 0, sizeof(cfg
));
52 cfg
.sock_type
= SOCK_STREAM
;
53 cfg
.proto
= IPPROTO_TCP
;
55 cfg
.check_timeout
= 20;
57 m_log
= reinterpret_cast<logger
*>(l
.clone());
58 cfg
.log
= m_log
->get_dnet_log();
60 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "0.0.0.0");
61 snprintf(cfg
.port
, sizeof(cfg
.port
), "0");
63 m_node
= dnet_node_create(&cfg
);
66 throw std::bad_alloc();
70 node::node(logger
&l
, struct dnet_config
&cfg
) : m_node(NULL
), m_log(NULL
)
72 cfg
.sock_type
= SOCK_STREAM
;
73 cfg
.proto
= IPPROTO_TCP
;
75 m_log
= reinterpret_cast<logger
*>(l
.clone());
76 cfg
.log
= m_log
->get_dnet_log();
78 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "0.0.0.0");
79 snprintf(cfg
.port
, sizeof(cfg
.port
), "0");
81 m_node
= dnet_node_create(&cfg
);
84 throw std::bad_alloc();
88 node::node(logger
&l
, const std::string
&config_path
) : m_node(NULL
), m_log(NULL
)
90 struct dnet_config cfg
;
91 memset(&cfg
, 0, sizeof(struct dnet_config
));
93 cfg
.sock_type
= SOCK_STREAM
;
94 cfg
.proto
= IPPROTO_TCP
;
96 m_log
= reinterpret_cast<logger
*>(l
.clone());
97 cfg
.log
= m_log
->get_dnet_log();
99 std::list
<addr_tuple
> remotes
;
100 std::vector
<int> groups
;
102 parse_config(config_path
, cfg
, remotes
, groups
, cfg
.log
->log_level
);
104 m_node
= dnet_node_create(&cfg
);
107 throw std::bad_alloc();
111 for (std::list
<addr_tuple
>::iterator it
= remotes
.begin(); it
!= remotes
.end(); ++it
) {
113 add_remote(it
->host
.c_str(), it
->port
, it
->family
);
122 dnet_node_destroy(m_node
);
126 void node::parse_config(const std::string
&path
, struct dnet_config
&cfg
,
127 std::list
<addr_tuple
> &remotes
,
128 std::vector
<int> &groups
,
131 std::ifstream
in(path
.c_str());
135 while (!in
.eof() && in
.good()) {
138 in
.getline((char *)line
.data(), line
.size());
139 size_t len
= in
.gcount();
143 if (in
.eof() || !in
.good())
149 if (line
.size() < 3 || line
.data()[0] == '#')
152 std::vector
<std::string
> strs
;
153 boost::split(strs
, line
, boost::is_any_of("="));
155 std::string key
= strs
[0];
158 if (strs
.size() != 2) {
159 std::ostringstream str
;
160 str
<< path
<< ": invalid elliptics config: line: " << line_num
<<
161 ", key: " << key
<< "': string is broken: size: " << strs
.size();
162 throw std::runtime_error(str
.str());
164 std::string value
= strs
[1];
167 if (key
== "remote") {
168 std::vector
<std::string
> rem
;
169 boost::split(rem
, value
, boost::is_any_of(" "));
171 for (std::vector
<std::string
>::iterator it
= rem
.begin(); it
!= rem
.end(); ++it
) {
172 std::string addr_str
= *it
;
173 if (dnet_parse_addr((char *)addr_str
.c_str(), &cfg
)) {
174 std::ostringstream str
;
175 str
<< path
<< ": invalid elliptics config: '" << key
<< "' ";
176 str
<< path
<< ": invalid elliptics config: line: " << line_num
<<
177 ", key: '" << key
<< "': remote addr is invalid";
178 throw std::runtime_error(str
.str());
181 addr_tuple
addr(cfg
.addr
, atoi(cfg
.port
), cfg
.family
);
182 remotes
.push_back(addr
);
186 if (key
== "groups") {
187 std::vector
<std::string
> gr
;
188 boost::split(gr
, value
, boost::is_any_of(":"));
190 for (std::vector
<std::string
>::iterator it
= gr
.begin(); it
!= gr
.end(); ++it
) {
191 int group
= atoi(it
->c_str());
194 groups
.push_back(group
);
198 if (key
== "check_timeout")
199 cfg
.check_timeout
= strtoul(value
.c_str(), NULL
, 0);
200 if (key
== "wait_timeout")
201 cfg
.wait_timeout
= strtoul(value
.c_str(), NULL
, 0);
202 if (key
== "log_level")
203 log_level
= strtoul(value
.c_str(), NULL
, 0);
207 void node::add_groups(std::vector
<int> &groups
)
209 if (dnet_node_set_groups(m_node
, (int *)&groups
[0], groups
.size()))
210 throw std::bad_alloc();
211 this->groups
= groups
;
214 void node::add_remote(const char *addr
, const int port
, const int family
)
216 struct dnet_config cfg
;
219 memset(&cfg
, 0, sizeof(cfg
));
222 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "%s", addr
);
223 snprintf(cfg
.port
, sizeof(cfg
.port
), "%d", port
);
225 err
= dnet_add_state(m_node
, &cfg
);
227 std::ostringstream str
;
228 str
<< "Failed to add remote addr " << addr
<< ":" << port
<< ": " << err
;
229 throw std::runtime_error(str
.str());
233 void node::read_file(struct dnet_id
&id
, const std::string
&file
, uint64_t offset
, uint64_t size
)
237 err
= dnet_read_file_id(m_node
, file
.c_str(), &id
, offset
, size
);
239 std::ostringstream str
;
240 str
<< dnet_dump_id(&id
) << ": READ: " << file
<< ": offset: " << offset
<< ", size: " << size
<< ": " << err
;
241 throw std::runtime_error(str
.str());
245 void node::read_file(const std::string
&remote
, const std::string
&file
, uint64_t offset
, uint64_t size
, int type
)
249 err
= dnet_read_file(m_node
, file
.c_str(), remote
.data(), remote
.size(), offset
, size
, type
);
252 transform(remote
, id
);
255 std::ostringstream str
;
256 str
<< dnet_dump_id(&id
) << ": READ: " << file
<< ": offset: " << offset
<< ", size: " << size
<< ": " << err
;
257 throw std::runtime_error(str
.str());
261 void node::write_file(struct dnet_id
&id
, const std::string
&file
, uint64_t local_offset
,
262 uint64_t offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
264 int err
= dnet_write_file_id(m_node
, file
.c_str(), &id
, local_offset
, offset
, size
, cflags
, ioflags
);
266 std::ostringstream str
;
267 str
<< dnet_dump_id(&id
) << ": WRITE: " << file
<< ", local_offset: " << local_offset
<<
268 ", offset: " << offset
<< ", size: " << size
<< ": " << err
;
269 throw std::runtime_error(str
.str());
272 void node::write_file(const std::string
&remote
, const std::string
&file
, uint64_t local_offset
, uint64_t offset
, uint64_t size
,
273 uint64_t cflags
, unsigned int ioflags
, int type
)
275 int err
= dnet_write_file(m_node
, file
.c_str(), remote
.data(), remote
.size(),
276 local_offset
, offset
, size
, cflags
, ioflags
, type
);
279 transform(remote
, id
);
282 std::ostringstream str
;
283 str
<< dnet_dump_id(&id
) << ": WRITE: " << file
<< ", local_offset: " << local_offset
<<
284 ", offset: " << offset
<< ", size: " << size
<< ": " << err
;
285 throw std::runtime_error(str
.str());
289 std::string
node::read_data_wait(struct dnet_id
&id
, uint64_t offset
, uint64_t size
,
290 uint64_t cflags
, uint32_t ioflags
)
292 struct dnet_io_attr io
;
295 memset(&io
, 0, sizeof(io
));
301 memcpy(io
.id
, id
.id
, DNET_ID_SIZE
);
302 memcpy(io
.parent
, id
.id
, DNET_ID_SIZE
);
304 void *data
= dnet_read_data_wait(m_node
, &id
, &io
, cflags
, &err
);
306 std::ostringstream str
;
307 str
<< dnet_dump_id(&id
) << ": READ: size: " << size
<< ": err: " << strerror(-err
) << ": " << err
;
308 throw std::runtime_error(str
.str());
311 std::string ret
= std::string((const char *)data
+ sizeof(struct dnet_io_attr
), io
.size
- sizeof(struct dnet_io_attr
));
317 std::string
node::read_data_wait(const std::string
&remote
, uint64_t offset
, uint64_t size
,
318 uint64_t cflags
, uint32_t ioflags
, int type
)
322 transform(remote
, id
);
325 return read_data_wait(id
, offset
, size
, cflags
, ioflags
);
328 void node::prepare_latest(struct dnet_id
&id
, uint64_t cflags
, std::vector
<int> &groups
)
330 struct dnet_read_latest_prepare pr
;
333 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
339 pr
.group
= (int *)malloc(groups
.size() * sizeof(int));
341 std::ostringstream str
;
343 str
<< dnet_dump_id(&id
) << ": prepare_latest: allocation failure: group num: " << groups
.size();
344 throw std::runtime_error(str
.str());
346 pr
.group_num
= groups
.size();
348 for (unsigned i
= 0; i
< groups
.size(); ++i
)
349 pr
.group
[i
] = groups
[i
];
351 err
= dnet_read_latest_prepare(&pr
);
356 for (int i
= 0; i
< pr
.group_num
; ++i
)
357 groups
.push_back(pr
.group
[i
]);
370 std::ostringstream str
;
372 str
<< dnet_dump_id(&id
) << ": prepare_latest: groups: " << groups
.size() << ": err: " << strerror(-err
) << ": " << err
;
373 throw std::runtime_error(str
.str());
377 std::string
node::read_latest(struct dnet_id
&id
, uint64_t offset
, uint64_t size
,
378 uint64_t cflags
, uint32_t ioflags
)
380 struct dnet_io_attr io
;
384 memset(&io
, 0, sizeof(io
));
389 io
.num
= groups
.size();
391 memcpy(io
.id
, id
.id
, DNET_ID_SIZE
);
392 memcpy(io
.parent
, id
.id
, DNET_ID_SIZE
);
394 err
= dnet_read_latest(m_node
, &id
, &io
, cflags
, &data
);
396 std::ostringstream str
;
397 str
<< dnet_dump_id(&id
) << ": READ: size: " << size
<< ": err: " << strerror(-err
) << ": " << err
;
398 throw std::runtime_error(str
.str());
401 std::string ret
= std::string((const char *)data
+ sizeof(struct dnet_io_attr
), io
.size
- sizeof(struct dnet_io_attr
));
407 std::string
node::read_latest(const std::string
&remote
, uint64_t offset
, uint64_t size
,
408 uint64_t cflags
, uint32_t ioflags
, int type
)
412 transform(remote
, id
);
415 return read_latest(id
, offset
, size
, cflags
, ioflags
);
418 std::string
node::write_cache(struct dnet_id
&id
, const std::string
&str
,
419 uint64_t cflags
, unsigned int ioflags
, long timeout
)
421 struct dnet_io_control ctl
;
423 memset(&ctl
, 0, sizeof(ctl
));
426 ctl
.data
= str
.data();
428 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_CACHE
;
429 ctl
.io
.start
= timeout
;
430 ctl
.io
.size
= str
.size();
431 ctl
.io
.type
= id
.type
;
432 ctl
.io
.num
= str
.size();
434 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
439 int err
= dnet_write_data_wait(m_node
, &ctl
, (void **)&result
);
441 std::ostringstream string
;
442 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
443 throw std::runtime_error(string
.str());
446 std::string
ret((const char *)result
, err
);
452 std::string
node::write_cache(const std::string
&key
, const std::string
&str
,
453 uint64_t cflags
, unsigned int ioflags
, long timeout
)
461 return write_cache(id
, str
, cflags
, ioflags
, timeout
);
464 std::string
node::write_data_wait(struct dnet_id
&id
, const std::string
&str
,
465 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
)
467 struct dnet_io_control ctl
;
469 memset(&ctl
, 0, sizeof(ctl
));
472 ctl
.data
= str
.data();
474 ctl
.io
.flags
= ioflags
;
475 ctl
.io
.offset
= remote_offset
;
476 ctl
.io
.size
= str
.size();
477 ctl
.io
.type
= id
.type
;
478 ctl
.io
.num
= str
.size() + remote_offset
;
480 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
485 int err
= dnet_write_data_wait(m_node
, &ctl
, (void **)&result
);
487 std::ostringstream string
;
488 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
489 throw std::runtime_error(string
.str());
492 std::string
ret((const char *)result
, err
);
498 std::string
node::write_data_wait(const std::string
&remote
, const std::string
&str
,
499 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
, int type
)
503 transform(remote
, id
);
507 return write_data_wait(id
, str
, remote_offset
, cflags
, ioflags
);
510 std::string
node::lookup_addr(const std::string
&remote
, const int group_id
)
514 int err
= dnet_lookup_addr(m_node
, remote
.data(), remote
.size(), NULL
, group_id
, buf
, sizeof(buf
));
516 std::ostringstream str
;
517 str
<< "Failed to lookup in group " << group_id
<< ": key size: " << remote
.size() << ", err: " << err
;
518 throw std::runtime_error(str
.str());
521 return std::string((const char *)buf
, strlen(buf
));
524 std::string
node::lookup_addr(const struct dnet_id
&id
)
528 int err
= dnet_lookup_addr(m_node
, NULL
, 0, (struct dnet_id
*)&id
, id
.group_id
, buf
, sizeof(buf
));
530 std::ostringstream str
;
531 str
<< "Failed to lookup " << dnet_dump_id(&id
) << ": err: " << err
;
532 throw std::runtime_error(str
.str());
535 return std::string((const char *)buf
, strlen(buf
));
538 std::string
node::create_metadata(const struct dnet_id
&id
, const std::string
&obj
,
539 const std::vector
<int> &groups
, const struct timespec
&ts
)
541 struct dnet_metadata_control ctl
;
542 struct dnet_meta_container mc
;
545 memset(&mc
, 0, sizeof(struct dnet_meta_container
));
546 memset(&ctl
, 0, sizeof(struct dnet_metadata_control
));
548 ctl
.obj
= (char *)obj
.data();
549 ctl
.len
= obj
.size();
551 ctl
.groups
= (int *)&groups
[0];
552 ctl
.group_num
= groups
.size();
557 err
= dnet_create_metadata(m_node
, &ctl
, &mc
);
559 std::ostringstream str
;
560 str
<< "Failed to create metadata: key: " << dnet_dump_id(&id
) << ", err: " << err
;
561 throw std::runtime_error(str
.str());
567 ret
.assign((char *)mc
.data
, mc
.size
);
577 int node::write_metadata(const struct dnet_id
&id
, const std::string
&obj
,
578 const std::vector
<int> &groups
, const struct timespec
&ts
, uint64_t cflags
)
582 struct dnet_meta_container mc
;
584 if (dnet_flags(m_node
) & DNET_CFG_NO_META
)
587 meta
= create_metadata(id
, obj
, groups
, ts
);
589 mc
.data
= (void *)meta
.data();
590 mc
.size
= meta
.size();
594 err
= dnet_write_metadata(m_node
, &mc
, 1, cflags
);
596 std::ostringstream str
;
597 str
<< "Failed to write metadata: key: " << dnet_dump_id(&id
) << ", err: " << err
;
598 throw std::runtime_error(str
.str());
604 void node::transform(const std::string
&data
, struct dnet_id
&id
)
606 dnet_transform(m_node
, (void *)data
.data(), data
.size(), &id
);
609 void node::lookup(const struct dnet_id
&id
, const callback
&c
)
611 int err
= dnet_lookup_object(m_node
, (struct dnet_id
*)&id
, 0,
612 callback::complete_callback
,
616 std::ostringstream str
;
617 str
<< "Failed to lookup ID " << dnet_dump_id(&id
) << ": " << err
;
618 throw std::runtime_error(str
.str());
622 void node::lookup(const std::string
&data
, const callback
&c
)
625 int error
= -ENOENT
, i
, num
, *g
;
630 num
= dnet_mix_states(m_node
, &id
, &g
);
632 throw std::bad_alloc();
634 for (i
=0; i
<num
; ++i
) {
650 std::ostringstream str
;
651 str
<< "Failed to lookup data object: key: " << dnet_dump_id(&id
);
652 throw std::runtime_error(str
.str());
656 std::string
node::lookup(const std::string
&data
)
659 int error
= -ENOENT
, i
, num
, *g
;
665 num
= dnet_mix_states(m_node
, &id
, &g
);
667 throw std::bad_alloc();
669 for (i
=0; i
<num
; ++i
) {
677 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
)) {
678 std::stringstream str
;
680 str
<< dnet_dump_id(&id
) << ": failed to receive lookup request";
681 throw std::runtime_error(str
.str());
683 /* reply parsing examaple */
685 struct dnet_addr
*addr
= (struct dnet_addr
*)ret
.data();
686 struct dnet_cmd
*cmd
= (struct dnet_cmd
*)(addr
+ 1);
688 if (cmd
->size
> sizeof(struct dnet_addr_attr
)) {
689 struct dnet_addr_attr
*a
= (struct dnet_addr_attr
*)(cmd
+ 1);
690 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
692 dnet_convert_addr_attr(a
);
693 dnet_convert_file_info(info
);
696 dnet_log_raw(m_node
, DNET_LOG_DEBUG
, "%s: %s: %zu bytes\n", dnet_dump_id(&id
), data
.c_str(), ret
.size());
699 } catch (const std::exception
&e
) {
700 dnet_log_raw(m_node
, DNET_LOG_ERROR
, "%s: %s : %s\n", dnet_dump_id(&id
), e
.what(), data
.c_str());
708 std::ostringstream str
;
709 str
<< data
<< ": could not find object";
711 throw std::runtime_error(str
.str());
717 std::string
node::lookup(const struct dnet_id
&id
)
728 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
)) {
729 std::stringstream str
;
731 str
<< dnet_dump_id(&id
) << ": failed to receive lookup request";
732 throw std::runtime_error(str
.str());
735 dnet_log_raw(m_node
, DNET_LOG_DEBUG
, "%s: %zu bytes\n", dnet_dump_id(&id
), ret
.size());
737 } catch (const std::exception
&e
) {
738 dnet_log_raw(m_node
, DNET_LOG_ERROR
, "%s: %s\n", dnet_dump_id(&id
), e
.what());
742 std::ostringstream str
;
743 str
<< dnet_dump_id(&id
) << ": could not find object";
745 throw std::runtime_error(str
.str());
751 void node::remove_raw(struct dnet_id
&id
, uint64_t cflags
, uint64_t ioflags
)
754 std::vector
<int> g
= groups
;
756 for (int i
=0; i
<(int)g
.size(); ++i
) {
759 if (!dnet_remove_object_now(m_node
, &id
, cflags
, ioflags
))
764 std::ostringstream str
;
765 str
<< dnet_dump_id(&id
) << ": REMOVE: " << err
;
766 throw std::runtime_error(str
.str());
770 void node::remove(struct dnet_id
&id
)
772 remove_raw(id
, 0, 0);
775 void node::remove_raw(const std::string
&data
, int type
, uint64_t cflags
, uint64_t ioflags
)
782 remove_raw(id
, cflags
, ioflags
);
785 void node::remove(const std::string
&data
, int type
)
787 remove_raw(data
, type
, 0, 0);
790 std::string
node::stat_log()
796 err
= dnet_request_stat(m_node
, NULL
, DNET_CMD_STAT
, 0,
797 callback::complete_callback
, (void *)&c
);
799 std::ostringstream str
;
800 str
<< "Failed to request statistics: " << err
;
801 throw std::runtime_error(str
.str());
806 /* example reply parsing */
809 const void *data
= ret
.data();
810 int size
= ret
.size();
811 char id_str
[DNET_ID_SIZE
*2 + 1];
815 struct dnet_addr
*addr
= (struct dnet_addr
*)data
;
816 struct dnet_cmd
*cmd
= (struct dnet_cmd
*)(addr
+ 1);
817 struct dnet_stat
*st
= (struct dnet_stat
*)(cmd
+ 1);
819 dnet_convert_stat(st
);
821 la
[0] = (float)st
->la
[0] / 100.0;
822 la
[1] = (float)st
->la
[1] / 100.0;
823 la
[2] = (float)st
->la
[2] / 100.0;
825 printf("<stat addr=\"%s\" id=\"%s\"><la>%.2f %.2f %.2f</la>"
826 "<memtotal>%llu KB</memtotal><memfree>%llu KB</memfree><memcached>%llu KB</memcached>"
827 "<storage_size>%llu MB</storage_size><available_size>%llu MB</available_size>"
828 "<files>%llu</files><fsid>0x%llx</fsid></stat>",
829 dnet_server_convert_dnet_addr_raw(addr
, addr_str
, sizeof(addr_str
)),
830 dnet_dump_id_len_raw(cmd
->id
.id
, DNET_ID_SIZE
, id_str
),
832 (unsigned long long)st
->vm_total
,
833 (unsigned long long)st
->vm_free
,
834 (unsigned long long)st
->vm_cached
,
835 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
836 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
837 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
840 int sz
= sizeof(*addr
) + sizeof(*cmd
) + cmd
->size
;
847 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
) + sizeof(struct dnet_stat
))
848 throw std::runtime_error("Failed to request statistics: not enough data returned");
852 int node::state_num(void)
854 return dnet_state_num(m_node
);
857 int node::request_cmd(struct dnet_trans_control
&ctl
)
861 err
= dnet_request_cmd(m_node
, &ctl
);
863 std::ostringstream str
;
864 str
<< dnet_dump_id(&ctl
.id
) << ": failed to request cmd: " << dnet_cmd_string(ctl
.cmd
) << ": " << err
;
865 throw std::runtime_error(str
.str());
871 void node::update_status(const char *saddr
, const int port
, const int family
, struct dnet_node_status
*status
)
874 struct dnet_addr addr
;
877 memset(&addr
, 0, sizeof(addr
));
878 addr
.addr_len
= sizeof(addr
.addr
);
880 snprintf(sport
, sizeof(sport
), "%d", port
);
882 err
= dnet_fill_addr(&addr
, saddr
, sport
, family
, SOCK_STREAM
, IPPROTO_TCP
);
884 err
= dnet_update_status(m_node
, &addr
, NULL
, status
);
887 std::ostringstream str
;
888 str
<< saddr
<< ":" << port
<< ": failed to request set status " << std::hex
<< status
<< ": " << err
;
889 throw std::runtime_error(str
.str());
893 void node::update_status(struct dnet_id
&id
, struct dnet_node_status
*status
)
897 err
= dnet_update_status(m_node
, NULL
, &id
, status
);
899 std::ostringstream str
;
901 str
<< dnet_dump_id(&id
) << ": failed to request set status " << std::hex
<< status
<< ": " << err
;
902 throw std::runtime_error(str
.str());
906 struct range_sort_compare
{
907 bool operator () (const std::string
&s1
, const std::string
&s2
) {
908 unsigned char *id1
= (unsigned char *)s1
.data();
909 unsigned char *id2
= (unsigned char *)s2
.data();
911 int cmp
= dnet_id_cmp_str(id1
, id2
);
917 std::vector
<std::string
> node::read_data_range(struct dnet_io_attr
&io
, int group_id
, uint64_t cflags
)
919 struct dnet_range_data
*data
;
921 uint32_t ioflags
= io
.flags
;
924 data
= dnet_read_range(m_node
, &io
, group_id
, cflags
, &err
);
926 std::ostringstream str
;
927 str
<< "Failed to read range data object: group: " << group_id
<<
928 ", key: " << dnet_dump_id_str(io
.id
) <<
929 ", size: " << io
.size
<< ": err: " << strerror(-err
) << ": " << err
;
930 throw std::runtime_error(str
.str());
933 std::vector
<std::string
> ret
;
936 for (int i
= 0; i
< err
; ++i
) {
937 struct dnet_range_data
*d
= &data
[i
];
938 char *data
= (char *)d
->data
;
940 if (!(ioflags
& DNET_IO_FLAGS_NODATA
)) {
942 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)data
;
944 dnet_convert_io_attr(io
);
946 uint64_t size
= dnet_bswap64(io
->size
);
950 str
.append((char *)io
->id
, DNET_ID_SIZE
);
951 str
.append((char *)&size
, 8);
952 str
.append((const char *)(io
+ 1), io
->size
);
956 data
+= sizeof(struct dnet_io_attr
) + io
->size
;
957 d
->size
-= sizeof(struct dnet_io_attr
) + io
->size
;
960 if (d
->size
!= sizeof(struct dnet_io_attr
)) {
961 std::ostringstream str
;
962 str
<< "Incorrect data size: d->size = " << d
->size
<<
963 "sizeof = " << sizeof(struct dnet_io_attr
);
964 throw std::runtime_error(str
.str());
966 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
975 if (ioflags
& DNET_IO_FLAGS_NODATA
) {
976 std::ostringstream str
;
978 ret
.push_back(str
.str());
985 std::vector
<struct dnet_io_attr
> node::remove_data_range(struct dnet_io_attr
&io
, int group_id
, uint64_t cflags
)
987 struct dnet_io_attr
*retp
;
991 retp
= dnet_remove_range(m_node
, &io
, group_id
, cflags
, &ret_num
, &err
);
994 std::ostringstream str
;
995 str
<< "Failed to read range data object: group: " << group_id
<<
996 ", key: " << dnet_dump_id_str(io
.id
) <<
997 ", size: " << io
.size
<< ": err: " << strerror(-err
) << ": " << err
;
998 throw std::runtime_error(str
.str());
1001 std::vector
<struct dnet_io_attr
> ret
;;
1004 for (int i
= 0; i
< ret_num
; ++i
) {
1005 ret
.push_back(retp
[i
]);
1014 std::string
node::write_prepare(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
,
1015 uint64_t psize
, uint64_t cflags
, unsigned int ioflags
, int type
)
1017 struct dnet_io_control ctl
;
1019 memset(&ctl
, 0, sizeof(ctl
));
1021 ctl
.cflags
= cflags
;
1022 ctl
.data
= str
.data();
1024 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_PREPARE
| DNET_IO_FLAGS_PLAIN_WRITE
;
1025 ctl
.io
.offset
= remote_offset
;
1026 ctl
.io
.size
= str
.size();
1030 transform(remote
, ctl
.id
);
1032 ctl
.id
.group_id
= 0;
1036 char *result
= NULL
;
1037 int err
= dnet_write_data_wait(m_node
, &ctl
, (void **)&result
);
1039 std::ostringstream string
;
1040 string
<< dnet_dump_id(&ctl
.id
) << ": " << remote
<< ": write_prepare: size: " << str
.size() << ", err: " << err
;
1041 throw std::runtime_error(string
.str());
1044 std::string
ret(result
, err
);
1050 std::string
node::write_commit(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
, uint64_t csize
,
1051 uint64_t cflags
, unsigned int ioflags
, int type
)
1053 struct dnet_io_control ctl
;
1055 memset(&ctl
, 0, sizeof(ctl
));
1057 ctl
.cflags
= cflags
;
1058 ctl
.data
= str
.data();
1060 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_COMMIT
| DNET_IO_FLAGS_PLAIN_WRITE
;
1061 ctl
.io
.offset
= remote_offset
;
1062 ctl
.io
.size
= str
.size();
1066 transform(remote
, ctl
.id
);
1068 ctl
.id
.group_id
= 0;
1072 char *result
= NULL
;
1073 int err
= dnet_write_data_wait(m_node
, &ctl
, (void **)&result
);
1075 std::ostringstream string
;
1076 string
<< dnet_dump_id(&ctl
.id
) << ": " << remote
<< ": write_commit: size: " << str
.size() << ", err: " << err
;
1077 throw std::runtime_error(string
.str());
1080 std::string
ret(result
, err
);
1086 std::string
node::write_plain(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
,
1087 uint64_t cflags
, unsigned int ioflags
, int type
)
1089 struct dnet_io_control ctl
;
1091 memset(&ctl
, 0, sizeof(ctl
));
1093 ctl
.cflags
= cflags
;
1094 ctl
.data
= str
.data();
1096 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_PLAIN_WRITE
;
1097 ctl
.io
.offset
= remote_offset
;
1098 ctl
.io
.size
= str
.size();
1101 transform(remote
, ctl
.id
);
1103 ctl
.id
.group_id
= 0;
1107 char *result
= NULL
;
1108 int err
= dnet_write_data_wait(m_node
, &ctl
, (void **)&result
);
1110 std::ostringstream string
;
1111 string
<< dnet_dump_id(&ctl
.id
) << ": " << remote
<< ": write_plain: size: " << str
.size() << ", err: " << err
;
1112 throw std::runtime_error(string
.str());
1115 std::string
ret(result
, err
);
1121 std::vector
<std::pair
<struct dnet_id
, struct dnet_addr
> > node::get_routes()
1123 std::vector
<std::pair
<struct dnet_id
, struct dnet_addr
> > res
;
1124 struct dnet_id
*ids
= NULL
;
1125 struct dnet_addr
*addrs
= NULL
;
1129 count
= dnet_get_routes(m_node
, &ids
, &addrs
);
1132 for (int i
= 0; i
< count
; ++i
) {
1133 res
.push_back(std::make_pair(ids
[i
], addrs
[i
]));
1146 std::string
node::request(struct dnet_id
*id
, struct sph
*sph
, bool lock
)
1148 std::string ret_str
;
1154 err
= dnet_send_cmd(m_node
, id
, sph
, &ret
);
1156 err
= dnet_send_cmd_nolock(m_node
, id
, sph
, &ret
);
1159 std::ostringstream str
;
1161 str
<< dnet_dump_id(id
) << ": failed to send request: " << strerror(-err
) << ": " << err
;
1162 throw std::runtime_error(str
.str());
1167 ret_str
.assign((char *)ret
, err
);
1178 std::string
node::raw_exec(struct dnet_id
*id
, const struct sph
*orig_sph
,
1179 const std::string
&event
, const std::string
&data
, const std::string
&binary
, bool lock
)
1181 std::vector
<char> vec(event
.size() + data
.size() + binary
.size() + sizeof(struct sph
));
1182 std::string ret_str
;
1184 struct sph
*sph
= (struct sph
*)&vec
[0];
1186 memset(sph
, 0, sizeof(struct sph
));
1189 sph
->flags
&= ~DNET_SPH_FLAGS_SRC_BLOCK
;
1191 sph
->flags
= DNET_SPH_FLAGS_SRC_BLOCK
;
1192 memcpy(sph
->src
.id
, id
->id
, sizeof(sph
->src
.id
));
1195 sph
->data_size
= data
.size();
1196 sph
->binary_size
= binary
.size();
1197 sph
->event_size
= event
.size();
1199 memcpy(sph
->data
, event
.data(), event
.size());
1200 memcpy(sph
->data
+ event
.size(), data
.data(), data
.size());
1201 memcpy(sph
->data
+ event
.size() + data
.size(), binary
.data(), binary
.size());
1203 return request(id
, sph
, lock
);
1206 std::string
node::exec_locked(struct dnet_id
*id
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1208 return raw_exec(id
, NULL
, event
, data
, binary
, true);
1211 std::string
node::exec_unlocked(struct dnet_id
*id
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1213 return raw_exec(id
, NULL
, event
, data
, binary
, false);
1216 std::string
node::push_locked(struct dnet_id
*id
, const struct sph
&sph
, const std::string
&event
,
1217 const std::string
&data
, const std::string
&binary
)
1219 return raw_exec(id
, &sph
, event
, data
, binary
, true);
1222 std::string
node::push_unlocked(struct dnet_id
*id
, const struct sph
&sph
, const std::string
&event
,
1223 const std::string
&data
, const std::string
&binary
)
1225 return raw_exec(id
, &sph
, event
, data
, binary
, false);
1228 void node::reply(const struct sph
&orig_sph
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1230 std::vector
<char> vec(event
.size() + data
.size() + binary
.size() + sizeof(struct sph
));
1231 std::string ret_str
;
1233 struct sph
*sph
= (struct sph
*)&vec
[0];
1237 sph
->data_size
= data
.size();
1238 sph
->binary_size
= binary
.size();
1239 sph
->event_size
= event
.size();
1241 memcpy(sph
->data
, event
.data(), event
.size());
1242 memcpy(sph
->data
+ event
.size(), data
.data(), data
.size());
1243 memcpy(sph
->data
+ event
.size() + data
.size(), binary
.data(), binary
.size());
1246 dnet_setup_id(&id
, 0, sph
->src
.id
);
1249 request(&id
, sph
, false);
1253 bool dnet_io_attr_compare(const struct dnet_io_attr
&io1
, const struct dnet_io_attr
&io2
) {
1256 cmp
= dnet_id_cmp_str(io1
.id
, io2
.id
);
1261 std::vector
<std::string
> node::bulk_read(const std::vector
<struct dnet_io_attr
> &ios
, uint64_t cflags
)
1263 struct dnet_range_data
*data
;
1266 num
= dnet_mix_states(m_node
, NULL
, &g
);
1268 throw std::runtime_error("could not fetch groups: " + std::string(strerror(num
)));
1270 std::vector
<int> groups
;
1272 groups
.assign(g
, g
+ num
);
1279 std::vector
<struct dnet_io_attr
> tmp_ios
= ios
;
1280 std::sort(tmp_ios
.begin(), tmp_ios
.end(), dnet_io_attr_compare
);
1282 std::vector
<std::string
> ret
;
1284 for (std::vector
<int>::iterator group
= groups
.begin(); group
!= groups
.end(); ++group
) {
1285 if (!tmp_ios
.size())
1288 data
= dnet_bulk_read(m_node
, (struct dnet_io_attr
*)(&tmp_ios
[0]), tmp_ios
.size(), *group
, cflags
, &err
);
1290 std::ostringstream str
;
1291 str
<< "Failed to read bulk data: group: " << *group
<<
1292 ": err: " << strerror(-err
) << ": " << err
;
1293 throw std::runtime_error(str
.str());
1297 for (int i
= 0; i
< err
; ++i
) {
1298 struct dnet_range_data
*d
= &data
[i
];
1299 char *data
= (char *)d
->data
;
1302 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)data
;
1304 for (std::vector
<struct dnet_io_attr
>::iterator it
= tmp_ios
.begin(); it
!= tmp_ios
.end(); ++it
) {
1305 int cmp
= dnet_id_cmp_str(it
->id
, io
->id
);
1313 dnet_convert_io_attr(io
);
1315 uint64_t size
= dnet_bswap64(io
->size
);
1319 str
.append((char *)io
->id
, DNET_ID_SIZE
);
1320 str
.append((char *)&size
, 8);
1321 str
.append((const char *)(io
+ 1), io
->size
);
1325 data
+= sizeof(struct dnet_io_attr
) + io
->size
;
1326 d
->size
-= sizeof(struct dnet_io_attr
) + io
->size
;
1339 std::vector
<std::string
> node::bulk_read(const std::vector
<std::string
> &keys
, uint64_t cflags
)
1341 std::vector
<struct dnet_io_attr
> ios
;
1342 struct dnet_io_attr io
;
1343 memset(&io
, 0, sizeof(io
));
1345 ios
.reserve(keys
.size());
1347 for (size_t i
= 0; i
< keys
.size(); ++i
) {
1350 transform(keys
[i
], id
);
1351 memcpy(io
.id
, id
.id
, sizeof(io
.id
));
1355 return bulk_read(ios
, cflags
);
1358 std::string
node::bulk_write(const std::vector
<struct dnet_io_attr
> &ios
, const std::vector
<std::string
> &data
, uint64_t cflags
)
1360 std::vector
<struct dnet_io_control
> ctls
;
1364 if (ios
.size() != data
.size()) {
1365 std::ostringstream string
;
1366 string
<< "BULK_WRITE: ios doesn't meet data: io.size: " << ios
.size() << ", data.size: " << data
.size();
1367 throw std::runtime_error(string
.str());
1370 ctls
.reserve(ios
.size());
1372 for(i
= 0; i
< ios
.size(); ++i
) {
1373 struct dnet_io_control ctl
;
1374 memset(&ctl
, 0, sizeof(ctl
));
1376 ctl
.cflags
= cflags
;
1377 ctl
.data
= data
[i
].data();
1381 dnet_setup_id(&ctl
.id
, 0, (unsigned char *)ios
[i
].id
);
1382 ctl
.id
.type
= ios
[i
].type
;
1386 ctls
.push_back(ctl
);
1389 struct dnet_range_data ret
= dnet_bulk_write(m_node
, &ctls
[0], ctls
.size(), &err
);
1391 std::ostringstream string
;
1392 string
<< "BULK_WRITE: size: " << ret
.size
<< ", err: " << err
;
1393 throw std::runtime_error(string
.str());
1396 std::string
ret_str((const char *)ret
.data
, ret
.size
);
1402 void node::set_timeouts(const int wait_timeout
, const int check_timeout
)
1404 dnet_set_timeouts(m_node
, wait_timeout
, check_timeout
);