Fixed reply size calculation
[elliptics.git] / cache / cache.cpp
blob88f71f49109b21b9ccb42f0aea3203ec45c99a99
1 /*
2 * 2012+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <iostream>
17 #include <deque>
18 #include <vector>
20 #include <boost/unordered_map.hpp>
21 #include <boost/shared_array.hpp>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/make_shared.hpp>
24 #include <boost/thread.hpp>
25 #include <boost/intrusive/list.hpp>
26 #include <boost/intrusive/set.hpp>
28 #include "../library/elliptics.h"
30 #include "elliptics/packet.h"
31 #include "elliptics/interface.h"
33 namespace ioremap { namespace cache {
35 struct key_t {
36 key_t(const unsigned char *id) {
37 memcpy(this->id, id, DNET_ID_SIZE);
40 unsigned char id[DNET_ID_SIZE];
43 size_t hash(const unsigned char *id) {
44 size_t num = DNET_ID_SIZE / sizeof(size_t);
46 size_t *ptr = (size_t *)id;
47 size_t hash = 0x883eaf5a;
48 for (size_t i = 0; i < num; ++i)
49 hash ^= ptr[i];
51 return hash;
54 struct hash_t {
55 std::size_t operator()(const key_t &key) const {
56 return ioremap::cache::hash(key.id);
60 struct equal_to {
61 bool operator() (const key_t &x, const key_t &y) const {
62 return memcmp(x.id, y.id, DNET_ID_SIZE) == 0;
66 class raw_data_t {
67 public:
68 raw_data_t(const char *data, size_t size) {
69 m_data.reserve(size);
70 m_data.insert(m_data.begin(), data, data + size);
73 std::vector<char> &data(void) {
74 return m_data;
77 size_t size(void) {
78 return m_data.size();
81 private:
82 std::vector<char> m_data;
85 struct data_lru_tag_t;
86 typedef boost::intrusive::list_base_hook<boost::intrusive::tag<data_lru_tag_t>,
87 boost::intrusive::link_mode<boost::intrusive::safe_link>
88 > lru_list_base_hook_t;
89 struct data_set_tag_t;
90 typedef boost::intrusive::set_base_hook<boost::intrusive::tag<data_set_tag_t>,
91 boost::intrusive::link_mode<boost::intrusive::safe_link>
92 > set_base_hook_t;
94 struct time_set_tag_t;
95 typedef boost::intrusive::set_base_hook<boost::intrusive::tag<time_set_tag_t>,
96 boost::intrusive::link_mode<boost::intrusive::safe_link>
97 > time_set_base_hook_t;
99 class data_t : public lru_list_base_hook_t, public set_base_hook_t, public time_set_base_hook_t {
100 public:
101 data_t(const unsigned char *id) : m_lifetime(0) {
102 memcpy(m_id.id, id, DNET_ID_SIZE);
105 data_t(const unsigned char *id, size_t lifetime, const char *data, size_t size, bool remove_from_disk) :
106 m_lifetime(0), m_remove_from_disk(remove_from_disk) {
107 memcpy(m_id.id, id, DNET_ID_SIZE);
109 if (lifetime)
110 m_lifetime = lifetime + time(NULL);
112 m_data.reset(new raw_data_t(data, size));
115 ~data_t() {
118 const struct dnet_raw_id &id(void) const {
119 return m_id;
122 boost::shared_ptr<raw_data_t> data(void) const {
123 return m_data;
126 size_t lifetime(void) const {
127 return m_lifetime;
130 bool remove_from_disk() const {
131 return m_remove_from_disk;
134 size_t size(void) const {
135 return m_data->size();
138 friend bool operator< (const data_t &a, const data_t &b) {
139 return dnet_id_cmp_str(a.id().id, b.id().id) < 0;
142 friend bool operator> (const data_t &a, const data_t &b) {
143 return dnet_id_cmp_str(a.id().id, b.id().id) > 0;
146 friend bool operator== (const data_t &a, const data_t &b) {
147 return dnet_id_cmp_str(a.id().id, b.id().id) == 0;
150 private:
151 size_t m_lifetime;
152 bool m_remove_from_disk;
153 struct dnet_raw_id m_id;
154 boost::shared_ptr<raw_data_t> m_data;
157 typedef boost::intrusive::list<data_t, boost::intrusive::base_hook<lru_list_base_hook_t> > lru_list_t;
158 typedef boost::intrusive::set<data_t, boost::intrusive::base_hook<set_base_hook_t>,
159 boost::intrusive::compare<std::less<data_t> >
160 > iset_t;
162 struct lifetime_less {
163 bool operator() (const data_t &x, const data_t &y) const {
164 return x.lifetime() < y.lifetime();
168 typedef boost::intrusive::set<data_t, boost::intrusive::base_hook<time_set_base_hook_t>,
169 boost::intrusive::compare<lifetime_less>
170 > life_set_t;
172 class cache_t {
173 public:
174 cache_t(struct dnet_node *n) : m_need_exit(false), m_node(n), m_cache_size(0), m_max_cache_size(n->cache_size) {
175 m_lifecheck = boost::thread(boost::bind(&cache_t::life_check, this));
178 ~cache_t() {
179 m_need_exit = true;
180 m_lifecheck.join();
182 while (!m_lru.empty()) {
183 data_t raw = m_lru.front();
184 erase_element(&raw);
188 void write(const unsigned char *id, size_t lifetime, const char *data, size_t size, bool remove_from_disk) {
189 boost::mutex::scoped_lock guard(m_lock);
191 iset_t::iterator it = m_set.find(id);
192 if (it != m_set.end())
193 erase_element(&(*it));
195 if (size + m_cache_size > m_max_cache_size)
196 resize(size * 2);
199 * nothing throws exception below this 'new' operator, so there is no try/catch block
201 data_t *raw = new data_t(id, lifetime, data, size, remove_from_disk);
203 m_set.insert(*raw);
204 m_lru.push_back(*raw);
205 if (lifetime)
206 m_lifeset.insert(*raw);
208 m_cache_size += size;
211 boost::shared_ptr<raw_data_t> read(const unsigned char *id) {
212 boost::mutex::scoped_lock guard(m_lock);
214 iset_t::iterator it = m_set.find(id);
215 if (it == m_set.end())
216 throw std::runtime_error("no record");
218 m_lru.erase(m_lru.iterator_to(*it));
219 m_lru.push_back(*it);
220 return it->data();
223 bool remove(const unsigned char *id) {
224 bool removed = false;
225 bool remove_from_disk = false;
227 boost::mutex::scoped_lock guard(m_lock);
228 iset_t::iterator it = m_set.find(id);
229 if (it != m_set.end()) {
230 remove_from_disk = it->remove_from_disk();
231 erase_element(&(*it));
232 removed = true;
235 guard.unlock();
237 if (remove_from_disk) {
238 struct dnet_id raw;
240 dnet_setup_id(&raw, 0, (unsigned char *)id);
241 raw.type = -1;
243 dnet_remove_local(m_node, &raw);
246 return removed;
249 private:
250 bool m_need_exit;
251 struct dnet_node *m_node;
252 size_t m_cache_size, m_max_cache_size;
253 boost::mutex m_lock;
254 iset_t m_set;
255 lru_list_t m_lru;
256 life_set_t m_lifeset;
257 boost::thread m_lifecheck;
259 void resize(size_t reserve) {
260 while (!m_lru.empty()) {
261 data_t *raw = &m_lru.front();
263 erase_element(raw);
266 /* break early if free space in cache more than requested reserve */
267 if (m_max_cache_size - m_cache_size > reserve)
268 break;
272 void erase_element(data_t *obj) {
273 m_lru.erase(m_lru.iterator_to(*obj));
274 m_set.erase(m_set.iterator_to(*obj));
275 if (obj->lifetime())
276 m_lifeset.erase(m_lifeset.iterator_to(*obj));
278 m_cache_size -= obj->size();
280 delete obj;
283 void life_check(void) {
284 while (!m_need_exit) {
285 std::deque<struct dnet_id> remove;
287 while (!m_need_exit && !m_lifeset.empty()) {
288 size_t time = ::time(NULL);
290 boost::mutex::scoped_lock guard(m_lock);
292 if (m_lifeset.empty())
293 break;
295 life_set_t::iterator it = m_lifeset.begin();
296 if (it->lifetime() > time)
297 break;
299 if (it->remove_from_disk()) {
300 struct dnet_id id;
302 dnet_setup_id(&id, 0, (unsigned char *)it->id().id);
303 id.type = -1;
305 remove.push_back(id);
308 erase_element(&(*it));
311 for (std::deque<struct dnet_id>::iterator it = remove.begin(); it != remove.end(); ++it) {
312 dnet_remove_local(m_node, &(*it));
315 sleep(1);
322 using namespace ioremap::cache;
324 int dnet_cmd_cache_io(struct dnet_net_state *st, struct dnet_cmd *cmd, struct dnet_io_attr *io, char *data)
326 struct dnet_node *n = st->n;
327 int err = -ENOTSUP;
329 if (!n->cache)
330 return -ENOTSUP;
332 cache_t *cache = (cache_t *)n->cache;
334 try {
335 boost::shared_ptr<raw_data_t> d;
337 switch (cmd->cmd) {
338 case DNET_CMD_WRITE:
339 cache->write(io->id, io->start, data, io->size, !!(io->flags & DNET_IO_FLAGS_CACHE_REMOVE_FROM_DISK));
340 err = 0;
341 break;
342 case DNET_CMD_READ:
343 d = cache->read(io->id);
344 if (io->offset + io->size > d->size()) {
345 dnet_log_raw(n, DNET_LOG_ERROR, "%s: %s cache: invalid offset/size: "
346 "offset: %llu, size: %llu, cached-size: %zd\n",
347 dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd),
348 (unsigned long long)io->offset, (unsigned long long)io->size,
349 d->size());
350 err = -EINVAL;
351 break;
354 io->size = d->size();
355 err = dnet_send_read_data(st, cmd, io, (char *)d->data().data() + io->offset, -1, io->offset, 0);
356 break;
357 case DNET_CMD_DEL:
358 err = -ENOENT;
359 if (cache->remove(cmd->id.id))
360 err = 0;
361 break;
363 } catch (const std::exception &e) {
364 dnet_log_raw(n, DNET_LOG_ERROR, "%s: %s cache operation failed: %s\n",
365 dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), e.what());
366 err = -ENOENT;
369 return err;
372 int dnet_cache_init(struct dnet_node *n)
374 if (!n->cache_size)
375 return 0;
377 try {
378 n->cache = (void *)(new cache_t(n));
379 } catch (const std::exception &e) {
380 dnet_log_raw(n, DNET_LOG_ERROR, "Could not create cache: %s\n", e.what());
381 return -ENOMEM;
384 return 0;
387 void dnet_cache_cleanup(struct dnet_node *n)
389 if (n->cache)
390 delete (cache_t *)n->cache;