2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #define _XOPEN_SOURCE 600
18 #include <sys/types.h>
20 #include <sys/socket.h>
41 #include <boost/algorithm/string.hpp>
42 #include <elliptics/cppdef.h>
44 using namespace ioremap::elliptics
;
46 node::node(logger
&l
) : m_node(NULL
), m_log(NULL
)
48 struct dnet_config cfg
;
50 memset(&cfg
, 0, sizeof(cfg
));
52 cfg
.sock_type
= SOCK_STREAM
;
53 cfg
.proto
= IPPROTO_TCP
;
55 cfg
.check_timeout
= 20;
57 m_log
= reinterpret_cast<logger
*>(l
.clone());
58 cfg
.log
= m_log
->get_dnet_log();
60 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "0.0.0.0");
61 snprintf(cfg
.port
, sizeof(cfg
.port
), "0");
63 m_node
= dnet_node_create(&cfg
);
66 throw std::bad_alloc();
70 node::node(logger
&l
, struct dnet_config
&cfg
) : m_node(NULL
), m_log(NULL
)
72 cfg
.sock_type
= SOCK_STREAM
;
73 cfg
.proto
= IPPROTO_TCP
;
75 m_log
= reinterpret_cast<logger
*>(l
.clone());
76 cfg
.log
= m_log
->get_dnet_log();
78 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "0.0.0.0");
79 snprintf(cfg
.port
, sizeof(cfg
.port
), "0");
81 m_node
= dnet_node_create(&cfg
);
84 throw std::bad_alloc();
88 node::node(logger
&l
, const std::string
&config_path
) : m_node(NULL
), m_log(NULL
)
90 struct dnet_config cfg
;
91 memset(&cfg
, 0, sizeof(struct dnet_config
));
93 cfg
.sock_type
= SOCK_STREAM
;
94 cfg
.proto
= IPPROTO_TCP
;
96 m_log
= reinterpret_cast<logger
*>(l
.clone());
97 cfg
.log
= m_log
->get_dnet_log();
99 std::list
<addr_tuple
> remotes
;
100 std::vector
<int> groups
;
102 parse_config(config_path
, cfg
, remotes
, groups
, cfg
.log
->log_level
);
104 m_node
= dnet_node_create(&cfg
);
107 throw std::bad_alloc();
110 for (std::list
<addr_tuple
>::iterator it
= remotes
.begin(); it
!= remotes
.end(); ++it
) {
112 add_remote(it
->host
.c_str(), it
->port
, it
->family
);
121 dnet_node_destroy(m_node
);
125 session::session(node
&n
) : m_node(&n
)
127 m_session
= dnet_session_create(m_node
->m_node
);
130 throw std::bad_alloc();
137 void node::parse_config(const std::string
&path
, struct dnet_config
&cfg
,
138 std::list
<addr_tuple
> &remotes
,
139 std::vector
<int> &groups
,
142 std::ifstream
in(path
.c_str());
146 while (!in
.eof() && in
.good()) {
149 in
.getline((char *)line
.data(), line
.size());
150 size_t len
= in
.gcount();
154 if (in
.eof() || !in
.good())
160 if (line
.size() < 3 || line
.data()[0] == '#')
163 std::vector
<std::string
> strs
;
164 boost::split(strs
, line
, boost::is_any_of("="));
166 std::string key
= strs
[0];
169 if (strs
.size() != 2) {
170 std::ostringstream str
;
171 str
<< path
<< ": invalid elliptics config: line: " << line_num
<<
172 ", key: " << key
<< "': string is broken: size: " << strs
.size();
173 throw std::runtime_error(str
.str());
175 std::string value
= strs
[1];
178 if (key
== "remote") {
179 std::vector
<std::string
> rem
;
180 boost::split(rem
, value
, boost::is_any_of(" "));
182 for (std::vector
<std::string
>::iterator it
= rem
.begin(); it
!= rem
.end(); ++it
) {
183 std::string addr_str
= *it
;
184 if (dnet_parse_addr((char *)addr_str
.c_str(), &cfg
)) {
185 std::ostringstream str
;
186 str
<< path
<< ": invalid elliptics config: '" << key
<< "' ";
187 str
<< path
<< ": invalid elliptics config: line: " << line_num
<<
188 ", key: '" << key
<< "': remote addr is invalid";
189 throw std::runtime_error(str
.str());
192 addr_tuple
addr(cfg
.addr
, atoi(cfg
.port
), cfg
.family
);
193 remotes
.push_back(addr
);
197 if (key
== "groups") {
198 std::vector
<std::string
> gr
;
199 boost::split(gr
, value
, boost::is_any_of(":"));
201 for (std::vector
<std::string
>::iterator it
= gr
.begin(); it
!= gr
.end(); ++it
) {
202 int group
= atoi(it
->c_str());
205 groups
.push_back(group
);
209 if (key
== "check_timeout")
210 cfg
.check_timeout
= strtoul(value
.c_str(), NULL
, 0);
211 if (key
== "wait_timeout")
212 cfg
.wait_timeout
= strtoul(value
.c_str(), NULL
, 0);
213 if (key
== "log_level")
214 log_level
= strtoul(value
.c_str(), NULL
, 0);
218 void node::add_remote(const char *addr
, const int port
, const int family
)
220 struct dnet_config cfg
;
223 memset(&cfg
, 0, sizeof(cfg
));
226 snprintf(cfg
.addr
, sizeof(cfg
.addr
), "%s", addr
);
227 snprintf(cfg
.port
, sizeof(cfg
.port
), "%d", port
);
229 err
= dnet_add_state(m_node
, &cfg
);
231 std::ostringstream str
;
232 str
<< "Failed to add remote addr " << addr
<< ":" << port
<< ": " << err
;
233 throw std::runtime_error(str
.str());
237 void node::set_timeouts(const int wait_timeout
, const int check_timeout
)
239 dnet_set_timeouts(m_node
, wait_timeout
, check_timeout
);
242 void session::add_groups(std::vector
<int> &groups
)
244 if (dnet_session_set_groups(m_session
, (int *)&groups
[0], groups
.size()))
245 throw std::bad_alloc();
246 this->groups
= groups
;
249 void session::read_file(struct dnet_id
&id
, const std::string
&file
, uint64_t offset
, uint64_t size
)
253 err
= dnet_read_file_id(m_session
, file
.c_str(), &id
, offset
, size
);
255 std::ostringstream str
;
256 str
<< dnet_dump_id(&id
) << ": READ: " << file
<< ": offset: " << offset
<< ", size: " << size
<< ": " << err
;
257 throw std::runtime_error(str
.str());
261 void session::read_file(const std::string
&remote
, const std::string
&file
, uint64_t offset
, uint64_t size
, int type
)
265 err
= dnet_read_file(m_session
, file
.c_str(), remote
.data(), remote
.size(), offset
, size
, type
);
268 transform(remote
, id
);
271 std::ostringstream str
;
272 str
<< dnet_dump_id(&id
) << ": READ: " << file
<< ": offset: " << offset
<< ", size: " << size
<< ": " << err
;
273 throw std::runtime_error(str
.str());
277 void session::write_file(struct dnet_id
&id
, const std::string
&file
, uint64_t local_offset
,
278 uint64_t offset
, uint64_t size
, uint64_t cflags
, unsigned int ioflags
)
280 int err
= dnet_write_file_id(m_session
, file
.c_str(), &id
, local_offset
, offset
, size
, cflags
, ioflags
);
282 std::ostringstream str
;
283 str
<< dnet_dump_id(&id
) << ": WRITE: " << file
<< ", local_offset: " << local_offset
<<
284 ", offset: " << offset
<< ", size: " << size
<< ": " << err
;
285 throw std::runtime_error(str
.str());
288 void session::write_file(const std::string
&remote
, const std::string
&file
, uint64_t local_offset
, uint64_t offset
, uint64_t size
,
289 uint64_t cflags
, unsigned int ioflags
, int type
)
291 int err
= dnet_write_file(m_session
, file
.c_str(), remote
.data(), remote
.size(),
292 local_offset
, offset
, size
, cflags
, ioflags
, type
);
295 transform(remote
, id
);
298 std::ostringstream str
;
299 str
<< dnet_dump_id(&id
) << ": WRITE: " << file
<< ", local_offset: " << local_offset
<<
300 ", offset: " << offset
<< ", size: " << size
<< ": " << err
;
301 throw std::runtime_error(str
.str());
305 std::string
session::read_data_wait(struct dnet_id
&id
, uint64_t offset
, uint64_t size
,
306 uint64_t cflags
, uint32_t ioflags
)
308 struct dnet_io_attr io
;
311 memset(&io
, 0, sizeof(io
));
317 memcpy(io
.id
, id
.id
, DNET_ID_SIZE
);
318 memcpy(io
.parent
, id
.id
, DNET_ID_SIZE
);
320 void *data
= dnet_read_data_wait(m_session
, &id
, &io
, cflags
, &err
);
322 std::ostringstream str
;
323 str
<< dnet_dump_id(&id
) << ": READ: size: " << size
<< ": err: " << strerror(-err
) << ": " << err
;
324 throw std::runtime_error(str
.str());
327 std::string ret
= std::string((const char *)data
+ sizeof(struct dnet_io_attr
), io
.size
- sizeof(struct dnet_io_attr
));
333 std::string
session::read_data_wait(const std::string
&remote
, uint64_t offset
, uint64_t size
,
334 uint64_t cflags
, uint32_t ioflags
, int type
)
338 transform(remote
, id
);
341 return read_data_wait(id
, offset
, size
, cflags
, ioflags
);
344 void session::prepare_latest(struct dnet_id
&id
, uint64_t cflags
, std::vector
<int> &groups
)
346 struct dnet_read_latest_prepare pr
;
349 memset(&pr
, 0, sizeof(struct dnet_read_latest_prepare
));
355 pr
.group
= (int *)malloc(groups
.size() * sizeof(int));
357 std::ostringstream str
;
359 str
<< dnet_dump_id(&id
) << ": prepare_latest: allocation failure: group num: " << groups
.size();
360 throw std::runtime_error(str
.str());
362 pr
.group_num
= groups
.size();
364 for (unsigned i
= 0; i
< groups
.size(); ++i
)
365 pr
.group
[i
] = groups
[i
];
367 err
= dnet_read_latest_prepare(&pr
);
372 for (int i
= 0; i
< pr
.group_num
; ++i
)
373 groups
.push_back(pr
.group
[i
]);
386 std::ostringstream str
;
388 str
<< dnet_dump_id(&id
) << ": prepare_latest: groups: " << groups
.size() << ": err: " << strerror(-err
) << ": " << err
;
389 throw std::runtime_error(str
.str());
393 std::string
session::read_latest(struct dnet_id
&id
, uint64_t offset
, uint64_t size
,
394 uint64_t cflags
, uint32_t ioflags
)
396 struct dnet_io_attr io
;
400 memset(&io
, 0, sizeof(io
));
405 io
.num
= groups
.size();
407 memcpy(io
.id
, id
.id
, DNET_ID_SIZE
);
408 memcpy(io
.parent
, id
.id
, DNET_ID_SIZE
);
410 err
= dnet_read_latest(m_session
, &id
, &io
, cflags
, &data
);
412 std::ostringstream str
;
413 str
<< dnet_dump_id(&id
) << ": READ: size: " << size
<< ": err: " << strerror(-err
) << ": " << err
;
414 throw std::runtime_error(str
.str());
417 std::string ret
= std::string((const char *)data
+ sizeof(struct dnet_io_attr
), io
.size
- sizeof(struct dnet_io_attr
));
423 std::string
session::read_latest(const std::string
&remote
, uint64_t offset
, uint64_t size
,
424 uint64_t cflags
, uint32_t ioflags
, int type
)
428 transform(remote
, id
);
431 return read_latest(id
, offset
, size
, cflags
, ioflags
);
434 std::string
session::write_cache(struct dnet_id
&id
, const std::string
&str
,
435 uint64_t cflags
, unsigned int ioflags
, long timeout
)
437 struct dnet_io_control ctl
;
439 memset(&ctl
, 0, sizeof(ctl
));
442 ctl
.data
= str
.data();
444 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_CACHE
;
445 ctl
.io
.start
= timeout
;
446 ctl
.io
.size
= str
.size();
447 ctl
.io
.type
= id
.type
;
448 ctl
.io
.num
= str
.size();
450 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
455 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
457 std::ostringstream string
;
458 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
459 throw std::runtime_error(string
.str());
462 std::string
ret((const char *)result
, err
);
468 std::string
session::write_cache(const std::string
&key
, const std::string
&str
,
469 uint64_t cflags
, unsigned int ioflags
, long timeout
)
477 return write_cache(id
, str
, cflags
, ioflags
, timeout
);
480 std::string
session::write_compare_and_swap(const struct dnet_id
&id
, const std::string
&str
,
481 const dnet_id
&old_csum
, uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
) {
482 struct dnet_io_control ctl
;
484 memset(&ctl
, 0, sizeof(ctl
));
487 ctl
.data
= str
.data();
489 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_COMPARE_AND_SWAP
;
490 ctl
.io
.offset
= remote_offset
;
491 ctl
.io
.size
= str
.size();
492 ctl
.io
.type
= id
.type
;
493 ctl
.io
.num
= str
.size() + remote_offset
;
495 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
496 memcpy(&ctl
.io
.parent
, &old_csum
.id
, DNET_ID_SIZE
);
501 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
503 std::ostringstream string
;
504 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
505 throw std::runtime_error(string
.str());
508 std::string
ret((const char *)result
, err
);
514 std::string
session::write_compare_and_swap(const std::string
&remote
, const std::string
&str
, const struct dnet_id
&old_csum
,
515 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
, int type
)
519 transform(remote
, id
);
523 return write_compare_and_swap(id
, str
, old_csum
, remote_offset
, cflags
, ioflags
);
526 std::string
session::write_data_wait(struct dnet_id
&id
, const std::string
&str
,
527 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
)
529 struct dnet_io_control ctl
;
531 memset(&ctl
, 0, sizeof(ctl
));
534 ctl
.data
= str
.data();
536 ctl
.io
.flags
= ioflags
;
537 ctl
.io
.offset
= remote_offset
;
538 ctl
.io
.size
= str
.size();
539 ctl
.io
.type
= id
.type
;
540 ctl
.io
.num
= str
.size() + remote_offset
;
542 memcpy(&ctl
.id
, &id
, sizeof(struct dnet_id
));
547 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
549 std::ostringstream string
;
550 string
<< dnet_dump_id(&id
) << ": WRITE: size: " << str
.size() << ", err: " << err
;
551 throw std::runtime_error(string
.str());
554 std::string
ret((const char *)result
, err
);
560 std::string
session::write_data_wait(const std::string
&remote
, const std::string
&str
,
561 uint64_t remote_offset
, uint64_t cflags
, unsigned int ioflags
, int type
)
565 transform(remote
, id
);
569 return write_data_wait(id
, str
, remote_offset
, cflags
, ioflags
);
572 std::string
session::lookup_addr(const std::string
&remote
, const int group_id
)
576 int err
= dnet_lookup_addr(m_session
, remote
.data(), remote
.size(), NULL
, group_id
, buf
, sizeof(buf
));
578 std::ostringstream str
;
579 str
<< "Failed to lookup in group " << group_id
<< ": key size: " << remote
.size() << ", err: " << err
;
580 throw std::runtime_error(str
.str());
583 return std::string((const char *)buf
, strlen(buf
));
586 std::string
session::lookup_addr(const struct dnet_id
&id
)
590 int err
= dnet_lookup_addr(m_session
, NULL
, 0, (struct dnet_id
*)&id
, id
.group_id
, buf
, sizeof(buf
));
592 std::ostringstream str
;
593 str
<< "Failed to lookup " << dnet_dump_id(&id
) << ": err: " << err
;
594 throw std::runtime_error(str
.str());
597 return std::string((const char *)buf
, strlen(buf
));
600 std::string
session::create_metadata(const struct dnet_id
&id
, const std::string
&obj
,
601 const std::vector
<int> &groups
, const struct timespec
&ts
)
603 struct dnet_metadata_control ctl
;
604 struct dnet_meta_container mc
;
607 memset(&mc
, 0, sizeof(struct dnet_meta_container
));
608 memset(&ctl
, 0, sizeof(struct dnet_metadata_control
));
610 ctl
.obj
= (char *)obj
.data();
611 ctl
.len
= obj
.size();
613 ctl
.groups
= (int *)&groups
[0];
614 ctl
.group_num
= groups
.size();
619 err
= dnet_create_metadata(m_session
, &ctl
, &mc
);
621 std::ostringstream str
;
622 str
<< "Failed to create metadata: key: " << dnet_dump_id(&id
) << ", err: " << err
;
623 throw std::runtime_error(str
.str());
629 ret
.assign((char *)mc
.data
, mc
.size
);
639 int session::write_metadata(const struct dnet_id
&id
, const std::string
&obj
,
640 const std::vector
<int> &groups
, const struct timespec
&ts
, uint64_t cflags
)
644 struct dnet_meta_container mc
;
646 if (dnet_flags(m_node
->m_node
) & DNET_CFG_NO_META
)
649 meta
= create_metadata(id
, obj
, groups
, ts
);
651 mc
.data
= (void *)meta
.data();
652 mc
.size
= meta
.size();
656 err
= dnet_write_metadata(m_session
, &mc
, 1, cflags
);
658 std::ostringstream str
;
659 str
<< "Failed to write metadata: key: " << dnet_dump_id(&id
) << ", err: " << err
;
660 throw std::runtime_error(str
.str());
666 void session::transform(const std::string
&data
, struct dnet_id
&id
)
668 dnet_transform(m_node
->m_node
, (void *)data
.data(), data
.size(), &id
);
671 void session::lookup(const struct dnet_id
&id
, const callback
&c
)
673 int err
= dnet_lookup_object(m_session
, (struct dnet_id
*)&id
, 0,
674 callback::complete_callback
,
678 std::ostringstream str
;
679 str
<< "Failed to lookup ID " << dnet_dump_id(&id
) << ": " << err
;
680 throw std::runtime_error(str
.str());
684 void session::lookup(const std::string
&data
, const callback
&c
)
687 int error
= -ENOENT
, i
, num
, *g
;
692 num
= dnet_mix_states(m_session
, &id
, &g
);
694 throw std::bad_alloc();
696 for (i
=0; i
<num
; ++i
) {
712 std::ostringstream str
;
713 str
<< "Failed to lookup data object: key: " << dnet_dump_id(&id
);
714 throw std::runtime_error(str
.str());
718 std::string
session::lookup(const std::string
&data
)
721 int error
= -ENOENT
, i
, num
, *g
;
727 num
= dnet_mix_states(m_session
, &id
, &g
);
729 throw std::bad_alloc();
731 for (i
=0; i
<num
; ++i
) {
739 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
)) {
740 std::stringstream str
;
742 str
<< dnet_dump_id(&id
) << ": failed to receive lookup request";
743 throw std::runtime_error(str
.str());
745 /* reply parsing examaple */
747 struct dnet_addr
*addr
= (struct dnet_addr
*)ret
.data();
748 struct dnet_cmd
*cmd
= (struct dnet_cmd
*)(addr
+ 1);
750 if (cmd
->size
> sizeof(struct dnet_addr_attr
)) {
751 struct dnet_addr_attr
*a
= (struct dnet_addr_attr
*)(cmd
+ 1);
752 struct dnet_file_info
*info
= (struct dnet_file_info
*)(a
+ 1);
754 dnet_convert_addr_attr(a
);
755 dnet_convert_file_info(info
);
758 dnet_log_raw(m_node
->m_node
, DNET_LOG_DEBUG
, "%s: %s: %zu bytes\n", dnet_dump_id(&id
), data
.c_str(), ret
.size());
761 } catch (const std::exception
&e
) {
762 dnet_log_raw(m_node
->m_node
, DNET_LOG_ERROR
, "%s: %s : %s\n", dnet_dump_id(&id
), e
.what(), data
.c_str());
770 std::ostringstream str
;
771 str
<< data
<< ": could not find object";
773 throw std::runtime_error(str
.str());
779 std::string
session::lookup(const struct dnet_id
&id
)
790 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
)) {
791 std::stringstream str
;
793 str
<< dnet_dump_id(&id
) << ": failed to receive lookup request";
794 throw std::runtime_error(str
.str());
797 dnet_log_raw(m_node
->m_node
, DNET_LOG_DEBUG
, "%s: %zu bytes\n", dnet_dump_id(&id
), ret
.size());
799 } catch (const std::exception
&e
) {
800 dnet_log_raw(m_node
->m_node
, DNET_LOG_ERROR
, "%s: %s\n", dnet_dump_id(&id
), e
.what());
804 std::ostringstream str
;
805 str
<< dnet_dump_id(&id
) << ": could not find object";
807 throw std::runtime_error(str
.str());
813 void session::remove_raw(struct dnet_id
&id
, uint64_t cflags
, uint64_t ioflags
)
816 std::vector
<int> g
= groups
;
818 for (int i
=0; i
<(int)g
.size(); ++i
) {
821 if (!dnet_remove_object_now(m_session
, &id
, cflags
, ioflags
))
826 std::ostringstream str
;
827 str
<< dnet_dump_id(&id
) << ": REMOVE: " << err
;
828 throw std::runtime_error(str
.str());
832 void session::remove(struct dnet_id
&id
)
834 remove_raw(id
, 0, 0);
837 void session::remove_raw(const std::string
&data
, int type
, uint64_t cflags
, uint64_t ioflags
)
844 remove_raw(id
, cflags
, ioflags
);
847 void session::remove(const std::string
&data
, int type
)
849 remove_raw(data
, type
, 0, 0);
852 std::string
session::stat_log()
858 err
= dnet_request_stat(m_session
, NULL
, DNET_CMD_STAT
, 0,
859 callback::complete_callback
, (void *)&c
);
861 std::ostringstream str
;
862 str
<< "Failed to request statistics: " << err
;
863 throw std::runtime_error(str
.str());
868 /* example reply parsing */
871 const void *data
= ret
.data();
872 int size
= ret
.size();
873 char id_str
[DNET_ID_SIZE
*2 + 1];
877 struct dnet_addr
*addr
= (struct dnet_addr
*)data
;
878 struct dnet_cmd
*cmd
= (struct dnet_cmd
*)(addr
+ 1);
879 struct dnet_stat
*st
= (struct dnet_stat
*)(cmd
+ 1);
881 dnet_convert_stat(st
);
883 la
[0] = (float)st
->la
[0] / 100.0;
884 la
[1] = (float)st
->la
[1] / 100.0;
885 la
[2] = (float)st
->la
[2] / 100.0;
887 printf("<stat addr=\"%s\" id=\"%s\"><la>%.2f %.2f %.2f</la>"
888 "<memtotal>%llu KB</memtotal><memfree>%llu KB</memfree><memcached>%llu KB</memcached>"
889 "<storage_size>%llu MB</storage_size><available_size>%llu MB</available_size>"
890 "<files>%llu</files><fsid>0x%llx</fsid></stat>",
891 dnet_server_convert_dnet_addr_raw(addr
, addr_str
, sizeof(addr_str
)),
892 dnet_dump_id_len_raw(cmd
->id
.id
, DNET_ID_SIZE
, id_str
),
894 (unsigned long long)st
->vm_total
,
895 (unsigned long long)st
->vm_free
,
896 (unsigned long long)st
->vm_cached
,
897 (unsigned long long)(st
->frsize
* st
->blocks
/ 1024 / 1024),
898 (unsigned long long)(st
->bavail
* st
->bsize
/ 1024 / 1024),
899 (unsigned long long)st
->files
, (unsigned long long)st
->fsid
);
902 int sz
= sizeof(*addr
) + sizeof(*cmd
) + cmd
->size
;
909 if (ret
.size() < sizeof(struct dnet_addr
) + sizeof(struct dnet_cmd
) + sizeof(struct dnet_stat
))
910 throw std::runtime_error("Failed to request statistics: not enough data returned");
914 int session::state_num(void)
916 return dnet_state_num(m_session
);
919 int session::request_cmd(struct dnet_trans_control
&ctl
)
923 err
= dnet_request_cmd(m_session
, &ctl
);
925 std::ostringstream str
;
926 str
<< dnet_dump_id(&ctl
.id
) << ": failed to request cmd: " << dnet_cmd_string(ctl
.cmd
) << ": " << err
;
927 throw std::runtime_error(str
.str());
933 void session::update_status(const char *saddr
, const int port
, const int family
, struct dnet_node_status
*status
)
936 struct dnet_addr addr
;
939 memset(&addr
, 0, sizeof(addr
));
940 addr
.addr_len
= sizeof(addr
.addr
);
942 snprintf(sport
, sizeof(sport
), "%d", port
);
944 err
= dnet_fill_addr(&addr
, saddr
, sport
, family
, SOCK_STREAM
, IPPROTO_TCP
);
946 err
= dnet_update_status(m_session
, &addr
, NULL
, status
);
949 std::ostringstream str
;
950 str
<< saddr
<< ":" << port
<< ": failed to request set status " << std::hex
<< status
<< ": " << err
;
951 throw std::runtime_error(str
.str());
955 void session::update_status(struct dnet_id
&id
, struct dnet_node_status
*status
)
959 err
= dnet_update_status(m_session
, NULL
, &id
, status
);
961 std::ostringstream str
;
963 str
<< dnet_dump_id(&id
) << ": failed to request set status " << std::hex
<< status
<< ": " << err
;
964 throw std::runtime_error(str
.str());
968 struct range_sort_compare
{
969 bool operator () (const std::string
&s1
, const std::string
&s2
) {
970 unsigned char *id1
= (unsigned char *)s1
.data();
971 unsigned char *id2
= (unsigned char *)s2
.data();
973 int cmp
= dnet_id_cmp_str(id1
, id2
);
979 std::vector
<std::string
> session::read_data_range(struct dnet_io_attr
&io
, int group_id
, uint64_t cflags
)
981 struct dnet_range_data
*data
;
983 uint32_t ioflags
= io
.flags
;
986 data
= dnet_read_range(m_session
, &io
, group_id
, cflags
, &err
);
988 std::ostringstream str
;
989 str
<< "Failed to read range data object: group: " << group_id
<<
990 ", key: " << dnet_dump_id_str(io
.id
) <<
991 ", size: " << io
.size
<< ": err: " << strerror(-err
) << ": " << err
;
992 throw std::runtime_error(str
.str());
995 std::vector
<std::string
> ret
;
998 for (int i
= 0; i
< err
; ++i
) {
999 struct dnet_range_data
*d
= &data
[i
];
1000 char *data
= (char *)d
->data
;
1002 if (!(ioflags
& DNET_IO_FLAGS_NODATA
)) {
1004 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)data
;
1006 dnet_convert_io_attr(io
);
1010 str
.append((char *)io
->id
, DNET_ID_SIZE
);
1011 str
.append((char *)&io
->size
, sizeof(io
->size
));
1012 str
.append((const char *)(io
+ 1), io
->size
);
1016 data
+= sizeof(struct dnet_io_attr
) + io
->size
;
1017 d
->size
-= sizeof(struct dnet_io_attr
) + io
->size
;
1020 if (d
->size
!= sizeof(struct dnet_io_attr
)) {
1021 std::ostringstream str
;
1022 str
<< "Incorrect data size: d->size = " << d
->size
<<
1023 "sizeof = " << sizeof(struct dnet_io_attr
);
1024 throw std::runtime_error(str
.str());
1026 struct dnet_io_attr
*rep
= (struct dnet_io_attr
*)data
;
1035 if (ioflags
& DNET_IO_FLAGS_NODATA
) {
1036 std::ostringstream str
;
1038 ret
.push_back(str
.str());
1045 std::vector
<struct dnet_io_attr
> session::remove_data_range(struct dnet_io_attr
&io
, int group_id
, uint64_t cflags
)
1047 struct dnet_io_attr
*retp
;
1051 retp
= dnet_remove_range(m_session
, &io
, group_id
, cflags
, &ret_num
, &err
);
1054 std::ostringstream str
;
1055 str
<< "Failed to read range data object: group: " << group_id
<<
1056 ", key: " << dnet_dump_id_str(io
.id
) <<
1057 ", size: " << io
.size
<< ": err: " << strerror(-err
) << ": " << err
;
1058 throw std::runtime_error(str
.str());
1061 std::vector
<struct dnet_io_attr
> ret
;;
1064 for (int i
= 0; i
< ret_num
; ++i
) {
1065 ret
.push_back(retp
[i
]);
1074 std::string
session::write_prepare(const struct dnet_id
&id
, const std::string
&str
, uint64_t remote_offset
,
1075 uint64_t psize
, uint64_t cflags
, unsigned int ioflags
)
1077 struct dnet_io_control ctl
;
1079 memset(&ctl
, 0, sizeof(ctl
));
1081 ctl
.cflags
= cflags
;
1082 ctl
.data
= str
.data();
1084 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_PREPARE
| DNET_IO_FLAGS_PLAIN_WRITE
;
1085 ctl
.io
.offset
= remote_offset
;
1086 ctl
.io
.size
= str
.size();
1087 ctl
.io
.type
= id
.type
;
1090 memcpy(&ctl
.id
, &id
, sizeof(id
));
1094 char *result
= NULL
;
1095 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
1097 std::ostringstream string
;
1098 string
<< dnet_dump_id(&ctl
.id
) << ": write_prepare: size: " << str
.size() << ", err: " << err
;
1099 throw std::runtime_error(string
.str());
1102 std::string
ret(result
, err
);
1108 std::string
session::write_prepare(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
,
1109 uint64_t psize
, uint64_t cflags
, unsigned int ioflags
, int type
)
1112 memset(&id
, 0, sizeof(id
));
1113 transform(remote
, id
);
1115 return write_prepare(id
, str
, remote_offset
, psize
, cflags
, ioflags
);
1118 std::string
session::write_commit(const struct dnet_id
&id
, const std::string
&str
, uint64_t remote_offset
, uint64_t csize
,
1119 uint64_t cflags
, unsigned int ioflags
)
1121 struct dnet_io_control ctl
;
1123 memset(&ctl
, 0, sizeof(ctl
));
1125 ctl
.cflags
= cflags
;
1126 ctl
.data
= str
.data();
1128 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_COMMIT
| DNET_IO_FLAGS_PLAIN_WRITE
;
1129 ctl
.io
.offset
= remote_offset
;
1130 ctl
.io
.size
= str
.size();
1131 ctl
.io
.type
= id
.type
;
1134 memcpy(&ctl
.id
, &id
, sizeof(id
));
1138 char *result
= NULL
;
1139 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
1141 std::ostringstream string
;
1142 string
<< dnet_dump_id(&ctl
.id
) << ": write_commit: size: " << str
.size() << ", err: " << err
;
1143 throw std::runtime_error(string
.str());
1146 std::string
ret(result
, err
);
1152 std::string
session::write_commit(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
, uint64_t csize
,
1153 uint64_t cflags
, unsigned int ioflags
, int type
)
1156 memset(&id
, 0, sizeof(id
));
1157 transform(remote
, id
);
1159 return write_commit(id
, str
, remote_offset
, csize
, cflags
, ioflags
);
1162 std::string
session::write_plain(const struct dnet_id
&id
, const std::string
&str
, uint64_t remote_offset
,
1163 uint64_t cflags
, unsigned int ioflags
)
1165 struct dnet_io_control ctl
;
1167 memset(&ctl
, 0, sizeof(ctl
));
1169 ctl
.cflags
= cflags
;
1170 ctl
.data
= str
.data();
1172 ctl
.io
.flags
= ioflags
| DNET_IO_FLAGS_PLAIN_WRITE
;
1173 ctl
.io
.offset
= remote_offset
;
1174 ctl
.io
.size
= str
.size();
1175 ctl
.io
.type
= id
.type
;
1177 memcpy(&ctl
.id
, &id
, sizeof(id
));
1181 char *result
= NULL
;
1182 int err
= dnet_write_data_wait(m_session
, &ctl
, (void **)&result
);
1184 std::ostringstream string
;
1185 string
<< dnet_dump_id(&ctl
.id
) << ": write_plain: size: " << str
.size() << ", err: " << err
;
1186 throw std::runtime_error(string
.str());
1189 std::string
ret(result
, err
);
1195 std::string
session::write_plain(const std::string
&remote
, const std::string
&str
, uint64_t remote_offset
,
1196 uint64_t cflags
, unsigned int ioflags
, int type
)
1199 memset(&id
, 0, sizeof(id
));
1200 transform(remote
, id
);
1202 return write_plain(id
, str
, remote_offset
, cflags
, ioflags
);
1205 std::vector
<std::pair
<struct dnet_id
, struct dnet_addr
> > session::get_routes()
1207 std::vector
<std::pair
<struct dnet_id
, struct dnet_addr
> > res
;
1208 struct dnet_id
*ids
= NULL
;
1209 struct dnet_addr
*addrs
= NULL
;
1213 count
= dnet_get_routes(m_session
, &ids
, &addrs
);
1216 for (int i
= 0; i
< count
; ++i
) {
1217 res
.push_back(std::make_pair(ids
[i
], addrs
[i
]));
1230 std::string
session::request(struct dnet_id
*id
, struct sph
*sph
, bool lock
)
1232 std::string ret_str
;
1238 err
= dnet_send_cmd(m_session
, id
, sph
, &ret
);
1240 err
= dnet_send_cmd_nolock(m_session
, id
, sph
, &ret
);
1243 std::ostringstream str
;
1245 str
<< dnet_dump_id(id
) << ": failed to send request: " << strerror(-err
) << ": " << err
;
1246 throw std::runtime_error(str
.str());
1251 ret_str
.assign((char *)ret
, err
);
1262 std::string
session::raw_exec(struct dnet_id
*id
, const struct sph
*orig_sph
,
1263 const std::string
&event
, const std::string
&data
, const std::string
&binary
, bool lock
)
1265 std::vector
<char> vec(event
.size() + data
.size() + binary
.size() + sizeof(struct sph
));
1266 std::string ret_str
;
1268 struct sph
*sph
= (struct sph
*)&vec
[0];
1270 memset(sph
, 0, sizeof(struct sph
));
1273 sph
->flags
&= ~DNET_SPH_FLAGS_SRC_BLOCK
;
1275 sph
->flags
= DNET_SPH_FLAGS_SRC_BLOCK
;
1276 memcpy(sph
->src
.id
, id
->id
, sizeof(sph
->src
.id
));
1279 sph
->data_size
= data
.size();
1280 sph
->binary_size
= binary
.size();
1281 sph
->event_size
= event
.size();
1283 memcpy(sph
->data
, event
.data(), event
.size());
1284 memcpy(sph
->data
+ event
.size(), data
.data(), data
.size());
1285 memcpy(sph
->data
+ event
.size() + data
.size(), binary
.data(), binary
.size());
1287 return request(id
, sph
, lock
);
1290 std::string
session::exec_locked(struct dnet_id
*id
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1292 return raw_exec(id
, NULL
, event
, data
, binary
, true);
1295 std::string
session::exec_unlocked(struct dnet_id
*id
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1297 return raw_exec(id
, NULL
, event
, data
, binary
, false);
1300 std::string
session::push_locked(struct dnet_id
*id
, const struct sph
&sph
, const std::string
&event
,
1301 const std::string
&data
, const std::string
&binary
)
1303 return raw_exec(id
, &sph
, event
, data
, binary
, true);
1306 std::string
session::push_unlocked(struct dnet_id
*id
, const struct sph
&sph
, const std::string
&event
,
1307 const std::string
&data
, const std::string
&binary
)
1309 return raw_exec(id
, &sph
, event
, data
, binary
, false);
1312 void session::reply(const struct sph
&orig_sph
, const std::string
&event
, const std::string
&data
, const std::string
&binary
)
1314 std::vector
<char> vec(event
.size() + data
.size() + binary
.size() + sizeof(struct sph
));
1315 std::string ret_str
;
1317 struct sph
*sph
= (struct sph
*)&vec
[0];
1321 sph
->data_size
= data
.size();
1322 sph
->binary_size
= binary
.size();
1323 sph
->event_size
= event
.size();
1325 memcpy(sph
->data
, event
.data(), event
.size());
1326 memcpy(sph
->data
+ event
.size(), data
.data(), data
.size());
1327 memcpy(sph
->data
+ event
.size() + data
.size(), binary
.data(), binary
.size());
1330 dnet_setup_id(&id
, 0, sph
->src
.id
);
1333 request(&id
, sph
, false);
1337 bool dnet_io_attr_compare(const struct dnet_io_attr
&io1
, const struct dnet_io_attr
&io2
) {
1340 cmp
= dnet_id_cmp_str(io1
.id
, io2
.id
);
1345 std::vector
<std::string
> session::bulk_read(const std::vector
<struct dnet_io_attr
> &ios
, uint64_t cflags
)
1347 struct dnet_range_data
*data
;
1350 num
= dnet_mix_states(m_session
, NULL
, &g
);
1352 throw std::runtime_error("could not fetch groups: " + std::string(strerror(num
)));
1354 std::vector
<int> groups
;
1356 groups
.assign(g
, g
+ num
);
1363 std::vector
<struct dnet_io_attr
> tmp_ios
= ios
;
1364 std::sort(tmp_ios
.begin(), tmp_ios
.end(), dnet_io_attr_compare
);
1366 std::vector
<std::string
> ret
;
1368 for (std::vector
<int>::iterator group
= groups
.begin(); group
!= groups
.end(); ++group
) {
1369 if (!tmp_ios
.size())
1372 data
= dnet_bulk_read(m_session
, (struct dnet_io_attr
*)(&tmp_ios
[0]), tmp_ios
.size(), *group
, cflags
, &err
);
1374 std::ostringstream str
;
1375 str
<< "Failed to read bulk data: group: " << *group
<<
1376 ": err: " << strerror(-err
) << ": " << err
;
1377 throw std::runtime_error(str
.str());
1381 for (int i
= 0; i
< err
; ++i
) {
1382 struct dnet_range_data
*d
= &data
[i
];
1383 char *data
= (char *)d
->data
;
1386 struct dnet_io_attr
*io
= (struct dnet_io_attr
*)data
;
1388 for (std::vector
<struct dnet_io_attr
>::iterator it
= tmp_ios
.begin(); it
!= tmp_ios
.end(); ++it
) {
1389 int cmp
= dnet_id_cmp_str(it
->id
, io
->id
);
1397 dnet_convert_io_attr(io
);
1399 uint64_t size
= dnet_bswap64(io
->size
);
1403 str
.append((char *)io
->id
, DNET_ID_SIZE
);
1404 str
.append((char *)&size
, 8);
1405 str
.append((const char *)(io
+ 1), io
->size
);
1409 data
+= sizeof(struct dnet_io_attr
) + io
->size
;
1410 d
->size
-= sizeof(struct dnet_io_attr
) + io
->size
;
1423 std::vector
<std::string
> session::bulk_read(const std::vector
<std::string
> &keys
, uint64_t cflags
)
1425 std::vector
<struct dnet_io_attr
> ios
;
1426 struct dnet_io_attr io
;
1427 memset(&io
, 0, sizeof(io
));
1429 ios
.reserve(keys
.size());
1431 for (size_t i
= 0; i
< keys
.size(); ++i
) {
1434 transform(keys
[i
], id
);
1435 memcpy(io
.id
, id
.id
, sizeof(io
.id
));
1439 return bulk_read(ios
, cflags
);
1442 std::string
session::bulk_write(const std::vector
<struct dnet_io_attr
> &ios
, const std::vector
<std::string
> &data
, uint64_t cflags
)
1444 std::vector
<struct dnet_io_control
> ctls
;
1448 if (ios
.size() != data
.size()) {
1449 std::ostringstream string
;
1450 string
<< "BULK_WRITE: ios doesn't meet data: io.size: " << ios
.size() << ", data.size: " << data
.size();
1451 throw std::runtime_error(string
.str());
1454 ctls
.reserve(ios
.size());
1456 for(i
= 0; i
< ios
.size(); ++i
) {
1457 struct dnet_io_control ctl
;
1458 memset(&ctl
, 0, sizeof(ctl
));
1460 ctl
.cflags
= cflags
;
1461 ctl
.data
= data
[i
].data();
1465 dnet_setup_id(&ctl
.id
, 0, (unsigned char *)ios
[i
].id
);
1466 ctl
.id
.type
= ios
[i
].type
;
1470 ctls
.push_back(ctl
);
1473 struct dnet_range_data ret
= dnet_bulk_write(m_session
, &ctls
[0], ctls
.size(), &err
);
1475 std::ostringstream string
;
1476 string
<< "BULK_WRITE: size: " << ret
.size
<< ", err: " << err
;
1477 throw std::runtime_error(string
.str());
1480 std::string
ret_str((const char *)ret
.data
, ret
.size
);
1486 struct dnet_node
* session::get_node()
1488 return m_node
->m_node
;