2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #define _XOPEN_SOURCE 600
18 #include <sys/types.h>
20 #include <sys/socket.h>
41 #include <boost/algorithm/string.hpp>
42 #include <elliptics/cppdef.h>
44 using namespace ioremap::elliptics
;
46 node::node(logger
&l
) : m_node(NULL
), m_log(NULL
)
48 struct dnet_config cfg
;
50 memset(&cfg
, 0, sizeof(cfg
));
52 cfg
.sock_type
= SOCK_STREAM
;
53 cfg
.proto
= IPPROTO_TCP
;
55 cfg
.check_timeout
= 20;
57 m_log
= reinterpret_cast<logger
*>(l
.clone());
58 cfg
.log
= m_log
->get_dnet_log();
60 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "0.0.0.0");
61 snprintf(cfg
.port
, sizeof(cfg
.port
), "0");
63 m_node
= dnet_node_create(&cfg
);
66 throw std::bad_alloc();
70 node::node(logger
&l
, struct dnet_config
&cfg
) : m_node(NULL
), m_log(NULL
)
72 cfg
.sock_type
= SOCK_STREAM
;
73 cfg
.proto
= IPPROTO_TCP
;
75 m_log
= reinterpret_cast<logger
*>(l
.clone());
76 cfg
.log
= m_log
->get_dnet_log();
78 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "0.0.0.0");
79 snprintf(cfg
.port
, sizeof(cfg
.port
), "0");
81 m_node
= dnet_node_create(&cfg
);
84 throw std::bad_alloc();
88 node::node(logger
&l
, const std::string
&config_path
) : m_node(NULL
), m_log(NULL
)
90 struct dnet_config cfg
;
91 memset(&cfg
, 0, sizeof(struct dnet_config
));
93 cfg
.sock_type
= SOCK_STREAM
;
94 cfg
.proto
= IPPROTO_TCP
;
96 m_log
= reinterpret_cast<logger
*>(l
.clone());
97 cfg
.log
= m_log
->get_dnet_log();
99 std::list
<addr_tuple
> remotes
;
100 std::vector
<int> groups
;
102 parse_config(config_path
, cfg
, remotes
, groups
, cfg
.log
->log_level
);
104 m_node
= dnet_node_create(&cfg
);
107 throw std::bad_alloc();
110 for (std::list
<addr_tuple
>::iterator it
= remotes
.begin(); it
!= remotes
.end(); ++it
) {
112 add_remote(it
->host
.c_str(), it
->port
, it
->family
);
121 dnet_node_destroy(m_node
);
125 session::session(node
&n
) : m_node(&n
)
127 m_session
= dnet_session_create(m_node
->m_node
);
130 throw std::bad_alloc();
137 void node::parse_config(const std::string
&path
, struct dnet_config
&cfg
,
138 std::list
<addr_tuple
> &remotes
,
139 std::vector
<int> &groups
,
142 std::ifstream
in(path
.c_str());
146 while (!in
.eof() && in
.good()) {
149 in
.getline((char *)line
.data(), line
.size());
150 size_t len
= in
.gcount();
154 if (in
.eof() || !in
.good())
160 if (line
.size() < 3 || line
.data()[0] == '#')
163 std::vector
<std::string
> strs
;
164 boost::split(strs
, line
, boost::is_any_of("="));
166 std::string key
= strs
[0];
169 if (strs
.size() != 2) {
170 std::ostringstream str
;
171 str
<< path
<< ": invalid elliptics config: line: " << line_num
<<
172 ", key: " << key
<< "': string is broken: size: " << strs
.size();
173 throw std::runtime_error(str
.str());
175 std::string value
= strs
[1];
178 if (key
== "remote") {
179 std::vector
<std::string
> rem
;
180 boost::split(rem
, value
, boost::is_any_of(" "));
182 for (std::vector
<std::string
>::iterator it
= rem
.begin(); it
!= rem
.end(); ++it
) {
183 std::string addr_str
= *it
;
184 if (dnet_parse_addr((char *)addr_str
.c_str(), &cfg
)) {
185 std::ostringstream str
;
186 str
<< path
<< ": invalid elliptics config: '" << key
<< "' ";
187 str
<< path
<< ": invalid elliptics config: line: " << line_num
<<
188 ", key: '" << key
<< "': remote addr is invalid";
189 throw std::runtime_error(str
.str());
192 addr_tuple
addr(cfg
.addr
, atoi(cfg
.port
), cfg
.family
);
193 remotes
.push_back(addr
);
197 if (key
== "groups") {
198 std::vector
<std::string
> gr
;
199 boost::split(gr
, value
, boost::is_any_of(":"));
201 for (std::vector
<std::string
>::iterator it
= gr
.begin(); it
!= gr
.end(); ++it
) {
202 int group
= atoi(it
->c_str());
205 groups
.push_back(group
);
209 if (key
== "check_timeout")
210 cfg
.check_timeout
= strtoul(value
.c_str(), NULL
, 0);
211 if (key
== "wait_timeout")
212 cfg
.wait_timeout
= strtoul(value
.c_str(), NULL
, 0);
213 if (key
== "log_level")
214 log_level
= strtoul(value
.c_str(), NULL
, 0);
218 void node::add_remote(const char *addr
, const int port
, const int family
)
220 struct dnet_config cfg
;
223 memset(&cfg
, 0, sizeof(cfg
));
226 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "%s", addr
);
227 snprintf(cfg
.port
, sizeof(cfg
.port
), "%d", port
);
229 err
= dnet_add_state(m_node
, &cfg
);
231 std::ostringstream str
;
232 str
<< "Failed to add remote addr " << addr
<< ":" << port
<< ": " << err
;
233 throw std::runtime_error(str
.str());
237 void node::set_timeouts(const int wait_timeout
, const int check_timeout
)
239 dnet_set_timeouts(m_node
, wait_timeout
, check_timeout
);
242 void session::add_groups(std::vector
<int> &groups
)
244 if (dnet_session_set_groups(m_session
, (int *)&groups
[0], groups
.size()))
245 throw std::bad_alloc();
246 this->groups
= groups
;
249 void session::read_file(struct dnet_id
&id
, const std::string
&file
, uint64_t offset
, uint64_t size
)
253 err
= dnet_read_file_id(m_session
, file
.c_str(), &id
, offset
, size
);
255 std::ostringstream str
;
256 str
<< dnet_dump_id(&id
) << ": READ: " << file
<< ": offset: " << offset
<< ", size: " << size
<< ": " << err
;
257 throw std::runtime_error(str
.str());
261 void session::read_file(const std::string
&remote
, const std::string
&file
, uint64_t offset
, uint64_t size
, int type
)
265 err
= dnet_read_file(m_session
, file
.c_str(), remote
.data(), remote
.size(), offset
, size
, type
);
268 transform(remote
, id
);
271 std::ostringstream str
;
272 str
<< dnet_dump_id(&id
) << ": READ: " << file
<< ": offset: " << offset
<< ", size: " << size
<< ": " << err
;
273 throw std::runtime_error(str
.str());
277 void session::write_file(struct dnet_id
&id
, const std::string
&file
, uint64_t local_offset
,
278 uint64_t offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
280 int err
= dnet_write_file_id(m_session
, file
.c_str(), &id
, local_offset
, offset
, size
, cflags
, ioflags
);
282 std::ostringstream str
;
283 str
<< dnet_dump_id(&id
) << ": WRITE: " << file
<< ", local_offset: " << local_offset
<<
284 ", offset: " << offset
<< ", size: " << size
<< ": " << err
;
285 throw std::runtime_error(str
.str());
288 void session::write_file(const std::string
&remote
, const std::string
&file
, uint64_t local_offset
, uint64_t offset
, uint64_t size
,
289 uint64_t cflags
, unsigned int ioflags
, int type
)
291 int err
= dnet_write_file(m_session
, file
.c_str(), remote
.data(), remote
.size(),
292 local_offset
, offset
, size
, cflags
, ioflags
, type
);
295 transform(remote
, id
);
298 std::ostringstream str
;
299 str
<< dnet_dump_id(&id
) << ": WRITE: " << file
<< ", local_offset: " << local_offset
<<
300 ", offset: " << offset
<< ", size: " << size
<< ": " << err
;
301 throw std::runtime_error(str
.str());
305 std::string
session::read_data_wait(struct dnet_id
&id
, uint64_t offset
, uint64_t size
,
306 uint64_t cflags
, uint32_t ioflags
)
308 struct dnet_io_attr io
;
311 memset(&io
, 0, sizeof(io
));
317 memcpy(io
.id
, id
.id
, DNET_ID_SIZE
);
318 memcpy(io
.parent
, id
.id
, DNET_ID_SIZE
);
320 void *data
= dnet_read_data_wait(m_session
, &id
, &io
, cflags
, &err
);
322 std::ostringstream str
;
323 str
<< dnet_dump_id(&id
) << ": READ: size: " << size
<< ": err: " << strerror(-err
) << ": " << err
;
324 throw std::runtime_error(str
.str());
327 std::string ret
= std::string((const char *)data
+ sizeof(struct dnet_io_attr
), io
.size
- sizeof(struct dnet_io_attr
));
333 std::string
session::read_data_wait(const std::string
&remote
, uint64_t offset
, uint64_t size
,
334 uint64_t cflags
, uint32_t ioflags
, int type
)
338 transform(remote
, id
);
341 return read_data_wait(id
, offset
, size
, cflags
, ioflags
);
344 void session::prepare_latest(struct dnet_id
&id
, uint64_t cflags
, std::vector
<int> &groups
)
346 struct dnet_read_latest_prepare pr
;
349 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
355 pr
.group
= (int *)malloc(groups
.size() * sizeof(int));
357 std::ostringstream str
;
359 str
<< dnet_dump_id(&id
) << ": prepare_latest: allocation failure: group num: " << groups
.size();
360 throw std::runtime_error(str
.str());
362 pr
.group_num
= groups
.size();
364 for (unsigned i
= 0; i
< groups
.size(); ++i
)
365 pr
.group
[i
] = groups
[i
];
367 err
= dnet_read_latest_prepare(&pr
);
372 for (int i
= 0; i
< pr
.group_num
; ++i
)
373 groups
.push_back(pr
.group
[i
]);
386 std::ostringstream str
;
388 str
<< dnet_dump_id(&id
) << ": prepare_latest: groups: " << groups
.size() << ": err: " << strerror(-err
) << ": " << err
;
389 throw std::runtime_error(str
.str());
393 std::string
session::read_latest(struct dnet_id
&id
, uint64_t offset
, uint64_t size
,
394 uint64_t cflags
, uint32_t ioflags
)
396 struct dnet_io_attr io
;
400 memset(&io
, 0, sizeof(io
));
405 io
.num
= groups
.size();
407 memcpy(io
.id
, id
.id
, DNET_ID_SIZE
);
408 memcpy(io
.parent
, id
.id
, DNET_ID_SIZE
);
410 err
= dnet_read_latest(m_session
, &id
, &io
, cflags
, &data
);
412 std::ostringstream str
;
413 str
<< dnet_dump_id(&id
) << ": READ: size: " << size
<< ": err: " << strerror(-err
) << ": " << err
;
414 throw std::runtime_error(str
.str());
417 std::string ret
= std::string((const char *)data
+ sizeof(struct dnet_io_attr
), io
.size
- sizeof(struct dnet_io_attr
));
423 std::string
session::read_latest(const std::string
&remote
, uint64_t offset
, uint64_t size
,
424 uint64_t cflags
, uint32_t ioflags
, int type
)
428 transform(remote
, id
);
431 return read_latest(id
, offset
, size
, cflags
, ioflags
);
434 std::string
session::write_cache(struct dnet_id
&id
, const std::string
&str
,
435 uint64_t cflags
, unsigned int ioflags
, long timeout
)
437 struct dnet_io_control ctl
;
439 memset(&ctl
, 0, sizeof(ctl
));
442 ctl
.data
= str
.data();
444 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_CACHE
;
445 ctl
.io
.start
= timeout
;
446 ctl
.io
.size
= str
.size();
447 ctl
.io
.type
= id
.type
;
448 ctl
.io
.num
= str
.size();
450 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
455 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
457 std::ostringstream string
;
458 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
459 throw std::runtime_error(string
.str());
462 std::string
ret((const char *)result
, err
);
468 std::string
session::write_cache(const std::string
&key
, const std::string
&str
,
469 uint64_t cflags
, unsigned int ioflags
, long timeout
)
477 return write_cache(id
, str
, cflags
, ioflags
, timeout
);
480 std::string
session::write_compare_and_swap(const struct dnet_id
&id
, const std::string
&str
,
481 const dnet_id
&old_csum
, uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
) {
482 struct dnet_io_control ctl
;
484 memset(&ctl
, 0, sizeof(ctl
));
487 ctl
.data
= str
.data();
489 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_COMPARE_AND_SWAP
;
490 ctl
.io
.offset
= remote_offset
;
491 ctl
.io
.size
= str
.size();
492 ctl
.io
.type
= id
.type
;
493 ctl
.io
.num
= str
.size() + remote_offset
;
495 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
496 memcpy(&ctl
.io
.parent
, &old_csum
.id
, DNET_ID_SIZE
);
501 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
503 std::ostringstream string
;
504 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
505 throw std::runtime_error(string
.str());
508 std::string
ret((const char *)result
, err
);
514 std::string
session::write_compare_and_swap(const std::string
&remote
, const std::string
&str
, const struct dnet_id
&old_csum
,
515 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
, int type
)
519 transform(remote
, id
);
523 return write_compare_and_swap(id
, str
, old_csum
, remote_offset
, cflags
, ioflags
);
526 std::string
session::write_data_wait(struct dnet_id
&id
, const std::string
&str
,
527 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
)
529 struct dnet_io_control ctl
;
531 memset(&ctl
, 0, sizeof(ctl
));
534 ctl
.data
= str
.data();
536 ctl
.io
.flags
= ioflags
;
537 ctl
.io
.offset
= remote_offset
;
538 ctl
.io
.size
= str
.size();
539 ctl
.io
.type
= id
.type
;
540 ctl
.io
.num
= str
.size() + remote_offset
;
542 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
547 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
549 std::ostringstream string
;
550 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
551 throw std::runtime_error(string
.str());
554 std::string
ret((const char *)result
, err
);
560 std::string
session::write_data_wait(const std::string
&remote
, const std::string
&str
,
561 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
, int type
)
565 transform(remote
, id
);
569 return write_data_wait(id
, str
, remote_offset
, cflags
, ioflags
);
572 std::string
session::lookup_addr(const std::string
&remote
, const int group_id
)
576 int err
= dnet_lookup_addr(m_session
, remote
.data(), remote
.size(), NULL
, group_id
, buf
, sizeof(buf
));
578 std::ostringstream str
;
579 str
<< "Failed to lookup in group " << group_id
<< ": key size: " << remote
.size() << ", err: " << err
;
580 throw std::runtime_error(str
.str());
583 return std::string((const char *)buf
, strlen(buf
));
586 std::string
session::lookup_addr(const struct dnet_id
&id
)
590 int err
= dnet_lookup_addr(m_session
, NULL
, 0, (struct dnet_id
*)&id
, id
.group_id
, buf
, sizeof(buf
));
592 std::ostringstream str
;
593 str
<< "Failed to lookup " << dnet_dump_id(&id
) << ": err: " << err
;
594 throw std::runtime_error(str
.str());
597 return std::string((const char *)buf
, strlen(buf
));
600 std::string
session::create_metadata(const struct dnet_id
&id
, const std::string
&obj
,
601 const std::vector
<int> &groups
, const struct timespec
&ts
)
603 struct dnet_metadata_control ctl
;
604 struct dnet_meta_container mc
;
607 memset(&mc
, 0, sizeof(struct dnet_meta_container
));
608 memset(&ctl
, 0, sizeof(struct dnet_metadata_control
));
610 ctl
.obj
= (char *)obj
.data();
611 ctl
.len
= obj
.size();
613 ctl
.groups
= (int *)&groups
[0];
614 ctl
.group_num
= groups
.size();
619 err
= dnet_create_metadata(m_session
, &ctl
, &mc
);
621 std::ostringstream str
;
622 str
<< "Failed to create metadata: key: " << dnet_dump_id(&id
) << ", err: " << err
;
623 throw std::runtime_error(str
.str());
629 ret
.assign((char *)mc
.data
, mc
.size
);
639 int session::write_metadata(const struct dnet_id
&id
, const std::string
&obj
,
640 const std::vector
<int> &groups
, const struct timespec
&ts
, uint64_t cflags
)
644 struct dnet_meta_container mc
;
646 if (dnet_flags(m_node
->m_node
) & DNET_CFG_NO_META
)
649 meta
= create_metadata(id
, obj
, groups
, ts
);
651 mc
.data
= (void *)meta
.data();
652 mc
.size
= meta
.size();
656 err
= dnet_write_metadata(m_session
, &mc
, 1, cflags
);
658 std::ostringstream str
;
659 str
<< "Failed to write metadata: key: " << dnet_dump_id(&id
) << ", err: " << err
;
660 throw std::runtime_error(str
.str());
666 void session::transform(const std::string
&data
, struct dnet_id
&id
)
668 dnet_transform(m_node
->m_node
, (void *)data
.data(), data
.size(), &id
);
671 void session::lookup(const struct dnet_id
&id
, const callback
&c
)
673 int err
= dnet_lookup_object(m_session
, (struct dnet_id
*)&id
, 0,
674 callback::complete_callback
,
678 std::ostringstream str
;
679 str
<< "Failed to lookup ID " << dnet_dump_id(&id
) << ": " << err
;
680 throw std::runtime_error(str
.str());
684 void session::lookup(const std::string
&data
, const callback
&c
)
687 int error
= -ENOENT
, i
, num
, *g
;
692 num
= dnet_mix_states(m_session
, &id
, &g
);
694 throw std::bad_alloc();
696 for (i
=0; i
<num
; ++i
) {
712 std::ostringstream str
;
713 str
<< "Failed to lookup data object: key: " << dnet_dump_id(&id
);
714 throw std::runtime_error(str
.str());
718 std::string
session::lookup(const std::string
&data
)
721 int error
= -ENOENT
, i
, num
, *g
;
727 num
= dnet_mix_states(m_session
, &id
, &g
);
729 throw std::bad_alloc();
731 for (i
=0; i
<num
; ++i
) {
739 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
)) {
740 std::stringstream str
;
742 str
<< dnet_dump_id(&id
) << ": failed to receive lookup request";
743 throw std::runtime_error(str
.str());
745 /* reply parsing examaple */
747 struct dnet_addr
*addr
= (struct dnet_addr
*)ret
.data();
748 struct dnet_cmd
*cmd
= (struct dnet_cmd
*)(addr
+ 1);
750 if (cmd
->size
> sizeof(struct dnet_addr_attr
)) {
751 struct dnet_addr_attr
*a
= (struct dnet_addr_attr
*)(cmd
+ 1);
752 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
754 dnet_convert_addr_attr(a
);
755 dnet_convert_file_info(info
);
758 dnet_log_raw(m_node
->m_node
, DNET_LOG_DEBUG
, "%s: %s: %zu bytes\n", dnet_dump_id(&id
), data
.c_str(), ret
.size());
761 } catch (const std::exception
&e
) {
762 dnet_log_raw(m_node
->m_node
, DNET_LOG_ERROR
, "%s: %s : %s\n", dnet_dump_id(&id
), e
.what(), data
.c_str());
770 std::ostringstream str
;
771 str
<< data
<< ": could not find object";
773 throw std::runtime_error(str
.str());
779 std::string
session::lookup(const struct dnet_id
&id
)
790 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
)) {
791 std::stringstream str
;
793 str
<< dnet_dump_id(&id
) << ": failed to receive lookup request";
794 throw std::runtime_error(str
.str());
797 dnet_log_raw(m_node
->m_node
, DNET_LOG_DEBUG
, "%s: %zu bytes\n", dnet_dump_id(&id
), ret
.size());
799 } catch (const std::exception
&e
) {
800 dnet_log_raw(m_node
->m_node
, DNET_LOG_ERROR
, "%s: %s\n", dnet_dump_id(&id
), e
.what());
804 std::ostringstream str
;
805 str
<< dnet_dump_id(&id
) << ": could not find object";
807 throw std::runtime_error(str
.str());
813 void session::remove_raw(struct dnet_id
&id
, uint64_t cflags
, uint64_t ioflags
)
816 std::vector
<int> g
= groups
;
818 for (int i
=0; i
<(int)g
.size(); ++i
) {
821 if (!dnet_remove_object_now(m_session
, &id
, cflags
, ioflags
))
826 std::ostringstream str
;
827 str
<< dnet_dump_id(&id
) << ": REMOVE: " << err
;
828 throw std::runtime_error(str
.str());
832 void session::remove(struct dnet_id
&id
)
834 remove_raw(id
, 0, 0);
837 void session::remove_raw(const std::string
&data
, int type
, uint64_t cflags
, uint64_t ioflags
)
844 remove_raw(id
, cflags
, ioflags
);
847 void session::remove(const std::string
&data
, int type
)
849 remove_raw(data
, type
, 0, 0);
852 std::string
session::stat_log()
858 err
= dnet_request_stat(m_session
, NULL
, DNET_CMD_STAT
, 0,
859 callback::complete_callback
, (void *)&c
);
861 std::ostringstream str
;
862 str
<< "Failed to request statistics: " << err
;
863 throw std::runtime_error(str
.str());
868 /* example reply parsing */
871 const void *data
= ret
.data();
872 int size
= ret
.size();
873 char id_str
[DNET_ID_SIZE
*2 + 1];
877 struct dnet_addr
*addr
= (struct dnet_addr
*)data
;
878 struct dnet_cmd
*cmd
= (struct dnet_cmd
*)(addr
+ 1);
879 struct dnet_stat
*st
= (struct dnet_stat
*)(cmd
+ 1);
881 dnet_convert_stat(st
);
883 la
[0] = (float)st
->la
[0] / 100.0;
884 la
[1] = (float)st
->la
[1] / 100.0;
885 la
[2] = (float)st
->la
[2] / 100.0;
887 printf("<stat addr=\"%s\" id=\"%s\"><la>%.2f %.2f %.2f</la>"
888 "<memtotal>%llu KB</memtotal><memfree>%llu KB</memfree><memcached>%llu KB</memcached>"
889 "<storage_size>%llu MB</storage_size><available_size>%llu MB</available_size>"
890 "<files>%llu</files><fsid>0x%llx</fsid></stat>",
891 dnet_server_convert_dnet_addr_raw(addr
, addr_str
, sizeof(addr_str
)),
892 dnet_dump_id_len_raw(cmd
->id
.id
, DNET_ID_SIZE
, id_str
),
894 (unsigned long long)st
->vm_total
,
895 (unsigned long long)st
->vm_free
,
896 (unsigned long long)st
->vm_cached
,
897 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
898 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
899 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
902 int sz
= sizeof(*addr
) + sizeof(*cmd
) + cmd
->size
;
909 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
) + sizeof(struct dnet_stat
))
910 throw std::runtime_error("Failed to request statistics: not enough data returned");
914 int session::state_num(void)
916 return dnet_state_num(m_session
);
919 int session::request_cmd(struct dnet_trans_control
&ctl
)
923 err
= dnet_request_cmd(m_session
, &ctl
);
925 std::ostringstream str
;
926 str
<< dnet_dump_id(&ctl
.id
) << ": failed to request cmd: " << dnet_cmd_string(ctl
.cmd
) << ": " << err
;
927 throw std::runtime_error(str
.str());
933 void session::update_status(const char *saddr
, const int port
, const int family
, struct dnet_node_status
*status
)
936 struct dnet_addr addr
;
939 memset(&addr
, 0, sizeof(addr
));
940 addr
.addr_len
= sizeof(addr
.addr
);
942 snprintf(sport
, sizeof(sport
), "%d", port
);
944 err
= dnet_fill_addr(&addr
, saddr
, sport
, family
, SOCK_STREAM
, IPPROTO_TCP
);
946 err
= dnet_update_status(m_session
, &addr
, NULL
, status
);
949 std::ostringstream str
;
950 str
<< saddr
<< ":" << port
<< ": failed to request set status " << std::hex
<< status
<< ": " << err
;
951 throw std::runtime_error(str
.str());
955 void session::update_status(struct dnet_id
&id
, struct dnet_node_status
*status
)
959 err
= dnet_update_status(m_session
, NULL
, &id
, status
);
961 std::ostringstream str
;
963 str
<< dnet_dump_id(&id
) << ": failed to request set status " << std::hex
<< status
<< ": " << err
;
964 throw std::runtime_error(str
.str());
968 struct range_sort_compare
{
969 bool operator () (const std::string
&s1
, const std::string
&s2
) {
970 unsigned char *id1
= (unsigned char *)s1
.data();
971 unsigned char *id2
= (unsigned char *)s2
.data();
973 int cmp
= dnet_id_cmp_str(id1
, id2
);
979 std::vector
<std::string
> session::read_data_range(struct dnet_io_attr
&io
, int group_id
, uint64_t cflags
)
981 struct dnet_range_data
*data
;
983 uint32_t ioflags
= io
.flags
;
986 data
= dnet_read_range(m_session
, &io
, group_id
, cflags
, &err
);
988 std::ostringstream str
;
989 str
<< "Failed to read range data object: group: " << group_id
<<
990 ", key: " << dnet_dump_id_str(io
.id
) <<
991 ", size: " << io
.size
<< ": err: " << strerror(-err
) << ": " << err
;
992 throw std::runtime_error(str
.str());
995 std::vector
<std::string
> ret
;
999 for (int i
= 0; i
< err
; ++i
) {
1000 struct dnet_range_data
*d
= &data
[i
];
1001 char *data
= (char *)d
->data
;
1003 if (!(ioflags
& DNET_IO_FLAGS_NODATA
)) {
1004 while (d
->size
> sizeof(struct dnet_io_attr
)) {
1005 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)data
;
1007 dnet_convert_io_attr(io
);
1011 if (sizeof(struct dnet_io_attr
) + io
->size
> d
->size
)
1013 std::ostringstream str
;
1014 str
<< "read_data_range: incorrect data size: d->size = "
1015 << d
->size
<< " io->size = "
1017 throw std::runtime_error(str
.str());
1020 str
.append((char *)io
->id
, DNET_ID_SIZE
);
1021 str
.append((char *)&io
->size
, sizeof(io
->size
));
1022 str
.append((const char *)(io
+ 1), io
->size
);
1026 data
+= sizeof(struct dnet_io_attr
) + io
->size
;
1027 d
->size
-= sizeof(struct dnet_io_attr
) + io
->size
;
1030 if (d
->size
!= sizeof(struct dnet_io_attr
)) {
1031 std::ostringstream str
;
1032 str
<< "Incorrect data size: d->size = " << d
->size
<<
1033 "sizeof = " << sizeof(struct dnet_io_attr
);
1034 throw std::runtime_error(str
.str());
1036 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
1040 for (int i
= 0; i
< err
; ++i
) {
1041 struct dnet_range_data
*d
= &data
[i
];
1045 } catch (const std::exception
& e
) {
1046 for (int i
= 0; i
< err
; ++i
) {
1047 struct dnet_range_data
*d
= &data
[i
];
1053 if (ioflags
& DNET_IO_FLAGS_NODATA
) {
1054 std::ostringstream str
;
1056 ret
.push_back(str
.str());
1063 std::vector
<struct dnet_io_attr
> session::remove_data_range(struct dnet_io_attr
&io
, int group_id
, uint64_t cflags
)
1065 struct dnet_io_attr
*retp
;
1069 retp
= dnet_remove_range(m_session
, &io
, group_id
, cflags
, &ret_num
, &err
);
1072 std::ostringstream str
;
1073 str
<< "Failed to read range data object: group: " << group_id
<<
1074 ", key: " << dnet_dump_id_str(io
.id
) <<
1075 ", size: " << io
.size
<< ": err: " << strerror(-err
) << ": " << err
;
1076 throw std::runtime_error(str
.str());
1079 std::vector
<struct dnet_io_attr
> ret
;;
1082 for (int i
= 0; i
< ret_num
; ++i
) {
1083 ret
.push_back(retp
[i
]);
1092 std::string
session::write_prepare(const struct dnet_id
&id
, const std::string
&str
, uint64_t remote_offset
,
1093 uint64_t psize
, uint64_t cflags
, unsigned int ioflags
)
1095 struct dnet_io_control ctl
;
1097 memset(&ctl
, 0, sizeof(ctl
));
1099 ctl
.cflags
= cflags
;
1100 ctl
.data
= str
.data();
1102 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_PREPARE
| DNET_IO_FLAGS_PLAIN_WRITE
;
1103 ctl
.io
.offset
= remote_offset
;
1104 ctl
.io
.size
= str
.size();
1105 ctl
.io
.type
= id
.type
;
1108 memcpy(&ctl
.id
, &id
, sizeof(id
));
1112 char *result
= NULL
;
1113 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
1115 std::ostringstream string
;
1116 string
<< dnet_dump_id(&ctl
.id
) << ": write_prepare: size: " << str
.size() << ", err: " << err
;
1117 throw std::runtime_error(string
.str());
1120 std::string
ret(result
, err
);
1126 std::string
session::write_prepare(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
,
1127 uint64_t psize
, uint64_t cflags
, unsigned int ioflags
, int type
)
1130 memset(&id
, 0, sizeof(id
));
1131 transform(remote
, id
);
1133 return write_prepare(id
, str
, remote_offset
, psize
, cflags
, ioflags
);
1136 std::string
session::write_commit(const struct dnet_id
&id
, const std::string
&str
, uint64_t remote_offset
, uint64_t csize
,
1137 uint64_t cflags
, unsigned int ioflags
)
1139 struct dnet_io_control ctl
;
1141 memset(&ctl
, 0, sizeof(ctl
));
1143 ctl
.cflags
= cflags
;
1144 ctl
.data
= str
.data();
1146 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_COMMIT
| DNET_IO_FLAGS_PLAIN_WRITE
;
1147 ctl
.io
.offset
= remote_offset
;
1148 ctl
.io
.size
= str
.size();
1149 ctl
.io
.type
= id
.type
;
1152 memcpy(&ctl
.id
, &id
, sizeof(id
));
1156 char *result
= NULL
;
1157 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
1159 std::ostringstream string
;
1160 string
<< dnet_dump_id(&ctl
.id
) << ": write_commit: size: " << str
.size() << ", err: " << err
;
1161 throw std::runtime_error(string
.str());
1164 std::string
ret(result
, err
);
1170 std::string
session::write_commit(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
, uint64_t csize
,
1171 uint64_t cflags
, unsigned int ioflags
, int type
)
1174 memset(&id
, 0, sizeof(id
));
1175 transform(remote
, id
);
1177 return write_commit(id
, str
, remote_offset
, csize
, cflags
, ioflags
);
1180 std::string
session::write_plain(const struct dnet_id
&id
, const std::string
&str
, uint64_t remote_offset
,
1181 uint64_t cflags
, unsigned int ioflags
)
1183 struct dnet_io_control ctl
;
1185 memset(&ctl
, 0, sizeof(ctl
));
1187 ctl
.cflags
= cflags
;
1188 ctl
.data
= str
.data();
1190 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_PLAIN_WRITE
;
1191 ctl
.io
.offset
= remote_offset
;
1192 ctl
.io
.size
= str
.size();
1193 ctl
.io
.type
= id
.type
;
1195 memcpy(&ctl
.id
, &id
, sizeof(id
));
1199 char *result
= NULL
;
1200 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
1202 std::ostringstream string
;
1203 string
<< dnet_dump_id(&ctl
.id
) << ": write_plain: size: " << str
.size() << ", err: " << err
;
1204 throw std::runtime_error(string
.str());
1207 std::string
ret(result
, err
);
1213 std::string
session::write_plain(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
,
1214 uint64_t cflags
, unsigned int ioflags
, int type
)
1217 memset(&id
, 0, sizeof(id
));
1218 transform(remote
, id
);
1220 return write_plain(id
, str
, remote_offset
, cflags
, ioflags
);
1223 std::vector
<std::pair
<struct dnet_id
, struct dnet_addr
> > session::get_routes()
1225 std::vector
<std::pair
<struct dnet_id
, struct dnet_addr
> > res
;
1226 struct dnet_id
*ids
= NULL
;
1227 struct dnet_addr
*addrs
= NULL
;
1231 count
= dnet_get_routes(m_session
, &ids
, &addrs
);
1234 for (int i
= 0; i
< count
; ++i
) {
1235 res
.push_back(std::make_pair(ids
[i
], addrs
[i
]));
1248 std::string
session::request(struct dnet_id
*id
, struct sph
*sph
, bool lock
)
1250 std::string ret_str
;
1256 err
= dnet_send_cmd(m_session
, id
, sph
, &ret
);
1258 err
= dnet_send_cmd_nolock(m_session
, id
, sph
, &ret
);
1261 std::ostringstream str
;
1263 str
<< dnet_dump_id(id
) << ": failed to send request: " << strerror(-err
) << ": " << err
;
1264 throw std::runtime_error(str
.str());
1269 ret_str
.assign((char *)ret
, err
);
1280 std::string
session::raw_exec(struct dnet_id
*id
, const struct sph
*orig_sph
,
1281 const std::string
&event
, const std::string
&data
, const std::string
&binary
, bool lock
)
1283 std::vector
<char> vec(event
.size() + data
.size() + binary
.size() + sizeof(struct sph
));
1284 std::string ret_str
;
1286 struct sph
*sph
= (struct sph
*)&vec
[0];
1288 memset(sph
, 0, sizeof(struct sph
));
1291 sph
->flags
&= ~DNET_SPH_FLAGS_SRC_BLOCK
;
1293 sph
->flags
= DNET_SPH_FLAGS_SRC_BLOCK
;
1294 memcpy(sph
->src
.id
, id
->id
, sizeof(sph
->src
.id
));
1297 sph
->data_size
= data
.size();
1298 sph
->binary_size
= binary
.size();
1299 sph
->event_size
= event
.size();
1301 memcpy(sph
->data
, event
.data(), event
.size());
1302 memcpy(sph
->data
+ event
.size(), data
.data(), data
.size());
1303 memcpy(sph
->data
+ event
.size() + data
.size(), binary
.data(), binary
.size());
1305 return request(id
, sph
, lock
);
1308 std::string
session::exec_locked(struct dnet_id
*id
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1310 return raw_exec(id
, NULL
, event
, data
, binary
, true);
1313 std::string
session::exec_unlocked(struct dnet_id
*id
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1315 return raw_exec(id
, NULL
, event
, data
, binary
, false);
1318 std::string
session::push_locked(struct dnet_id
*id
, const struct sph
&sph
, const std::string
&event
,
1319 const std::string
&data
, const std::string
&binary
)
1321 return raw_exec(id
, &sph
, event
, data
, binary
, true);
1324 std::string
session::push_unlocked(struct dnet_id
*id
, const struct sph
&sph
, const std::string
&event
,
1325 const std::string
&data
, const std::string
&binary
)
1327 return raw_exec(id
, &sph
, event
, data
, binary
, false);
1330 void session::reply(const struct sph
&orig_sph
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1332 std::vector
<char> vec(event
.size() + data
.size() + binary
.size() + sizeof(struct sph
));
1333 std::string ret_str
;
1335 struct sph
*sph
= (struct sph
*)&vec
[0];
1339 sph
->data_size
= data
.size();
1340 sph
->binary_size
= binary
.size();
1341 sph
->event_size
= event
.size();
1343 memcpy(sph
->data
, event
.data(), event
.size());
1344 memcpy(sph
->data
+ event
.size(), data
.data(), data
.size());
1345 memcpy(sph
->data
+ event
.size() + data
.size(), binary
.data(), binary
.size());
1348 dnet_setup_id(&id
, 0, sph
->src
.id
);
1351 request(&id
, sph
, false);
1355 bool dnet_io_attr_compare(const struct dnet_io_attr
&io1
, const struct dnet_io_attr
&io2
) {
1358 cmp
= dnet_id_cmp_str(io1
.id
, io2
.id
);
1363 std::vector
<std::string
> session::bulk_read(const std::vector
<struct dnet_io_attr
> &ios
, uint64_t cflags
)
1365 struct dnet_range_data
*data
;
1368 num
= dnet_mix_states(m_session
, NULL
, &g
);
1370 throw std::runtime_error("could not fetch groups: " + std::string(strerror(num
)));
1372 std::vector
<int> groups
;
1374 groups
.assign(g
, g
+ num
);
1381 std::vector
<struct dnet_io_attr
> tmp_ios
= ios
;
1382 std::sort(tmp_ios
.begin(), tmp_ios
.end(), dnet_io_attr_compare
);
1384 std::vector
<std::string
> ret
;
1386 for (std::vector
<int>::iterator group
= groups
.begin(); group
!= groups
.end(); ++group
) {
1387 if (!tmp_ios
.size())
1390 data
= dnet_bulk_read(m_session
, (struct dnet_io_attr
*)(&tmp_ios
[0]), tmp_ios
.size(), *group
, cflags
, &err
);
1392 std::ostringstream str
;
1393 str
<< "Failed to read bulk data: group: " << *group
<<
1394 ": err: " << strerror(-err
) << ": " << err
;
1395 throw std::runtime_error(str
.str());
1399 for (int i
= 0; i
< err
; ++i
) {
1400 struct dnet_range_data
*d
= &data
[i
];
1401 char *data
= (char *)d
->data
;
1404 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)data
;
1406 for (std::vector
<struct dnet_io_attr
>::iterator it
= tmp_ios
.begin(); it
!= tmp_ios
.end(); ++it
) {
1407 int cmp
= dnet_id_cmp_str(it
->id
, io
->id
);
1415 dnet_convert_io_attr(io
);
1417 uint64_t size
= dnet_bswap64(io
->size
);
1421 str
.append((char *)io
->id
, DNET_ID_SIZE
);
1422 str
.append((char *)&size
, 8);
1423 str
.append((const char *)(io
+ 1), io
->size
);
1427 data
+= sizeof(struct dnet_io_attr
) + io
->size
;
1428 d
->size
-= sizeof(struct dnet_io_attr
) + io
->size
;
1441 std::vector
<std::string
> session::bulk_read(const std::vector
<std::string
> &keys
, uint64_t cflags
)
1443 std::vector
<struct dnet_io_attr
> ios
;
1444 struct dnet_io_attr io
;
1445 memset(&io
, 0, sizeof(io
));
1447 ios
.reserve(keys
.size());
1449 for (size_t i
= 0; i
< keys
.size(); ++i
) {
1452 transform(keys
[i
], id
);
1453 memcpy(io
.id
, id
.id
, sizeof(io
.id
));
1457 return bulk_read(ios
, cflags
);
1460 std::string
session::bulk_write(const std::vector
<struct dnet_io_attr
> &ios
, const std::vector
<std::string
> &data
, uint64_t cflags
)
1462 std::vector
<struct dnet_io_control
> ctls
;
1466 if (ios
.size() != data
.size()) {
1467 std::ostringstream string
;
1468 string
<< "BULK_WRITE: ios doesn't meet data: io.size: " << ios
.size() << ", data.size: " << data
.size();
1469 throw std::runtime_error(string
.str());
1472 ctls
.reserve(ios
.size());
1474 for(i
= 0; i
< ios
.size(); ++i
) {
1475 struct dnet_io_control ctl
;
1476 memset(&ctl
, 0, sizeof(ctl
));
1478 ctl
.cflags
= cflags
;
1479 ctl
.data
= data
[i
].data();
1483 dnet_setup_id(&ctl
.id
, 0, (unsigned char *)ios
[i
].id
);
1484 ctl
.id
.type
= ios
[i
].type
;
1488 ctls
.push_back(ctl
);
1491 struct dnet_range_data ret
= dnet_bulk_write(m_session
, &ctls
[0], ctls
.size(), &err
);
1493 std::ostringstream string
;
1494 string
<< "BULK_WRITE: size: " << ret
.size
<< ", err: " << err
;
1495 throw std::runtime_error(string
.str());
1498 std::string
ret_str((const char *)ret
.data
, ret
.size
);
1504 struct dnet_node
* session::get_node()
1506 return m_node
->m_node
;