TransitionalCache: update key_transform to handle prefixes
[reddit.git] / r2 / r2 / lib / cache.py
blob1f5baf8be352009105aeb6b24816c347bb96856f
1 # The contents of this file are subject to the Common Public Attribution
2 # License Version 1.0. (the "License"); you may not use this file except in
3 # compliance with the License. You may obtain a copy of the License at
4 # http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
5 # License Version 1.1, but Sections 14 and 15 have been added to cover use of
6 # software over a computer network and provide for limited attribution for the
7 # Original Developer. In addition, Exhibit A has been modified to be consistent
8 # with Exhibit B.
10 # Software distributed under the License is distributed on an "AS IS" basis,
11 # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
12 # the specific language governing rights and limitations under the License.
14 # The Original Code is reddit.
16 # The Original Developer is the Initial Developer. The Initial Developer of
17 # the Original Code is reddit Inc.
19 # All portions of the code written by reddit are Copyright (c) 2006-2015 reddit
20 # Inc. All Rights Reserved.
21 ###############################################################################
23 import sys
24 from threading import local
25 from hashlib import md5
26 import cPickle as pickle
27 from copy import copy
28 from curses.ascii import isgraph
29 import logging
30 from time import sleep
32 from pylons import app_globals as g
34 import pylibmc
35 from _pylibmc import MemcachedError
37 import random
39 from pycassa import ColumnFamily
40 from pycassa.cassandra.ttypes import ConsistencyLevel
42 from r2.lib.utils import in_chunks, prefix_keys, trace, tup
43 from r2.lib.hardcachebackend import HardCacheBackend
45 from r2.lib.sgm import sgm # get this into our namespace so that it's
46 # importable from us
48 import random
50 # This is for use in the health controller
51 _CACHE_SERVERS = set()
53 class NoneResult(object): pass
55 class CacheUtils(object):
56 # Caches that never expire entries should set this to true, so that
57 # CacheChain can properly count hits and misses.
58 permanent = False
60 def incr_multi(self, keys, delta=1, prefix=''):
61 for k in keys:
62 try:
63 self.incr(prefix + k, delta)
64 except ValueError:
65 pass
67 def add_multi(self, keys, prefix='', time=0):
68 for k,v in keys.iteritems():
69 self.add(prefix+str(k), v, time = time)
71 def get_multi(self, keys, prefix='', **kw):
72 return prefix_keys(keys, prefix, lambda k: self.simple_get_multi(k, **kw))
75 class MemcachedValueSizeException(Exception):
76 def __init__(self, cache_name, caller, prefix, key, size):
77 self.key = key
78 self.size = size
79 self.cache_name = cache_name
80 self.caller = caller
81 self.prefix = prefix
83 def __str__(self):
84 return ("Memcached %s %s: The object for key '%s%s' is too big for memcached at %s bytes" %
85 (self.cache_name, self.caller, self.prefix, self.key, self.size))
88 # validation functions to be used by memcached pools
89 MEMCACHED_MAX_VALUE_SIZE = 2048 * 1024 # 2MB
92 def is_valid_size_for_cache(obj):
93 # NOTE: only the memory consumption directly attributed to the object is
94 # accounted for, not the memory consumption of objects it refers to.
95 # For instance a tuple of strings will only appear to be the size of a
96 # tuple.
97 return sys.getsizeof(obj) < MEMCACHED_MAX_VALUE_SIZE
100 def validate_size_warn(**kwargs):
101 if 'value' in kwargs:
102 if not is_valid_size_for_cache(kwargs["value"]):
103 key = ".".join((
104 "memcached_large_object",
105 kwargs.get("cache_name", "undefined")
107 g.stats.simple_event(key)
108 g.log.debug(
109 "Memcached %s: Attempted to cache an object > 1MB at key: '%s%s' of size %s bytes",
110 kwargs.get("caller", "unknown"),
111 kwargs.get("prefix", ""),
112 kwargs.get("key", "undefined"),
113 sys.getsizeof(kwargs["value"])
115 return False
117 return True
119 def validate_size_error(**kwargs):
120 if 'value' in kwargs:
121 if not is_valid_size_for_cache(kwargs["value"]):
122 raise MemcachedValueSizeException(
123 kwargs.get("cache_name", "unknown"),
124 kwargs.get("caller", "unknown"),
125 kwargs.get("prefix", ""),
126 kwargs.get("key", "undefined"),
127 sys.getsizeof(kwargs["value"])
130 return True
132 class CMemcache(CacheUtils):
133 def __init__(self,
134 name,
135 servers,
136 debug=False,
137 noreply=False,
138 no_block=False,
139 min_compress_len=512 * 1024,
140 num_clients=10,
141 binary=False,
142 validators=None):
143 self.name = name
144 self.servers = servers
145 self.clients = pylibmc.ClientPool(n_slots = num_clients)
146 self.validators = validators or []
148 for x in xrange(num_clients):
149 client = pylibmc.Client(servers, binary=binary)
150 behaviors = {
151 'no_block': no_block, # use async I/O
152 'tcp_nodelay': True, # no nagle
153 '_noreply': int(noreply),
154 'ketama': True, # consistent hashing
156 if not binary:
157 behaviors['verify_keys'] = True
159 client.behaviors.update(behaviors)
160 self.clients.put(client)
162 self.min_compress_len = min_compress_len
164 _CACHE_SERVERS.update(servers)
166 def validate(self, **kwargs):
167 kwargs['caller'] = sys._getframe().f_back.f_code.co_name
168 kwargs['cache_name'] = self.name
169 if not all(validator(**kwargs) for validator in self.validators):
170 return False
172 return True
174 def get(self, key, default = None):
175 if not self.validate(key=key):
176 return default
178 with self.clients.reserve() as mc:
179 ret = mc.get(str(key))
180 if ret is None:
181 return default
182 return ret
184 def get_multi(self, keys, prefix = ''):
185 validated_keys = [k for k in (str(k) for k in keys)
186 if self.validate(prefix=prefix, key=k)]
188 with self.clients.reserve() as mc:
189 return mc.get_multi(validated_keys, key_prefix = prefix)
191 # simple_get_multi exists so that a cache chain can
192 # single-instance the handling of prefixes for performance, but
193 # pylibmc does this in C which is faster anyway, so CMemcache
194 # implements get_multi itself. But the CacheChain still wants
195 # simple_get_multi to be available for when it's already prefixed
196 # them, so here it is
197 simple_get_multi = get_multi
199 def set(self, key, val, time = 0):
200 if not self.validate(key=key, value=val):
201 return None
203 with self.clients.reserve() as mc:
204 return mc.set(str(key), val, time=time,
205 min_compress_len = self.min_compress_len)
207 def set_multi(self, keys, prefix='', time=0):
208 str_keys = ((str(k), v) for k, v in keys.iteritems())
209 validated_keys = {k: v for k, v in str_keys
210 if self.validate(prefix=prefix, key=k, value=v)}
212 with self.clients.reserve() as mc:
213 return mc.set_multi(validated_keys, key_prefix = prefix,
214 time = time,
215 min_compress_len = self.min_compress_len)
217 def add_multi(self, keys, prefix='', time=0):
218 str_keys = ((str(k), v) for k, v in keys.iteritems())
219 validated_keys = {k: v for k, v in str_keys
220 if self.validate(prefix=prefix, key=k, value=v)}
222 with self.clients.reserve() as mc:
223 return mc.add_multi(validated_keys, key_prefix = prefix,
224 time = time)
226 def incr_multi(self, keys, prefix='', delta=1):
227 validated_keys = [k for k in (str(k) for k in keys)
228 if self.validate(prefix=prefix, key=k)]
230 with self.clients.reserve() as mc:
231 return mc.incr_multi(validated_keys,
232 key_prefix = prefix,
233 delta=delta)
235 def append(self, key, val, time=0):
236 if not self.validate(key=key, value=val):
237 return None
239 with self.clients.reserve() as mc:
240 return mc.append(str(key), val, time=time)
242 def incr(self, key, delta=1, time=0):
243 if not self.validate(key=key):
244 return None
246 # ignore the time on these
247 with self.clients.reserve() as mc:
248 return mc.incr(str(key), delta)
250 def add(self, key, val, time=0):
251 if not self.validate(key=key, value=val):
252 return None
254 try:
255 with self.clients.reserve() as mc:
256 return mc.add(str(key), val, time=time)
257 except pylibmc.DataExists:
258 return None
260 def delete(self, key, time=0):
261 if not self.validate(key=key):
262 return None
264 with self.clients.reserve() as mc:
265 return mc.delete(str(key))
267 def delete_multi(self, keys, prefix=''):
268 validated_keys = [k for k in (str(k) for k in keys)
269 if self.validate(prefix=prefix, key=k)]
271 with self.clients.reserve() as mc:
272 return mc.delete_multi(validated_keys, key_prefix=prefix)
274 def __repr__(self):
275 return '<%s(%r)>' % (self.__class__.__name__,
276 self.servers)
278 class HardCache(CacheUtils):
279 backend = None
280 permanent = True
282 def __init__(self, gc):
283 self.backend = HardCacheBackend(gc)
285 def _split_key(self, key):
286 tokens = key.split("-", 1)
287 if len(tokens) != 2:
288 raise ValueError("key %s has no dash" % key)
290 category, ids = tokens
291 return category, ids
293 def set(self, key, val, time=0):
294 if val == NoneResult:
295 # NoneResult caching is for other parts of the chain
296 return
298 category, ids = self._split_key(key)
299 self.backend.set(category, ids, val, time)
301 def simple_get_multi(self, keys):
302 results = {}
303 category_bundles = {}
304 for key in keys:
305 category, ids = self._split_key(key)
306 category_bundles.setdefault(category, []).append(ids)
308 for category in category_bundles:
309 idses = category_bundles[category]
310 chunks = in_chunks(idses, size=50)
311 for chunk in chunks:
312 new_results = self.backend.get_multi(category, chunk)
313 results.update(new_results)
315 return results
317 def set_multi(self, keys, prefix='', time=0):
318 for k,v in keys.iteritems():
319 if v != NoneResult:
320 self.set(prefix+str(k), v, time=time)
322 def get(self, key, default=None):
323 category, ids = self._split_key(key)
324 r = self.backend.get(category, ids)
325 if r is None: return default
326 return r
328 def delete(self, key, time=0):
329 # Potential optimization: When on a negative-result caching chain,
330 # shove NoneResult throughout the chain when a key is deleted.
331 category, ids = self._split_key(key)
332 self.backend.delete(category, ids)
334 def add(self, key, value, time=0):
335 category, ids = self._split_key(key)
336 return self.backend.add(category, ids, value, time=time)
338 def incr(self, key, delta=1, time=0):
339 category, ids = self._split_key(key)
340 return self.backend.incr(category, ids, delta=delta, time=time)
343 class LocalCache(dict, CacheUtils):
344 def __init__(self, *a, **kw):
345 return dict.__init__(self, *a, **kw)
347 def _check_key(self, key):
348 if isinstance(key, unicode):
349 key = str(key) # try to convert it first
350 if not isinstance(key, str):
351 raise TypeError('Key is not a string: %r' % (key,))
353 def get(self, key, default=None):
354 r = dict.get(self, key)
355 if r is None: return default
356 return r
358 def simple_get_multi(self, keys):
359 out = {}
360 for k in keys:
361 if self.has_key(k):
362 out[k] = self[k]
363 return out
365 def set(self, key, val, time = 0):
366 # time is ignored on localcache
367 self._check_key(key)
368 self[key] = val
370 def set_multi(self, keys, prefix='', time=0):
371 for k,v in keys.iteritems():
372 self.set(prefix+str(k), v, time=time)
374 def add(self, key, val, time = 0):
375 self._check_key(key)
376 was = key in self
377 self.setdefault(key, val)
378 return not was
380 def delete(self, key):
381 if self.has_key(key):
382 del self[key]
384 def delete_multi(self, keys):
385 for key in keys:
386 if self.has_key(key):
387 del self[key]
389 def incr(self, key, delta=1, time=0):
390 if self.has_key(key):
391 self[key] = int(self[key]) + delta
393 def decr(self, key, amt=1):
394 if self.has_key(key):
395 self[key] = int(self[key]) - amt
397 def append(self, key, val, time = 0):
398 if self.has_key(key):
399 self[key] = str(self[key]) + val
401 def prepend(self, key, val, time = 0):
402 if self.has_key(key):
403 self[key] = val + str(self[key])
405 def replace(self, key, val, time = 0):
406 if self.has_key(key):
407 self[key] = val
409 def flush_all(self):
410 self.clear()
412 def __repr__(self):
413 return "<LocalCache(%d)>" % (len(self),)
416 class TransitionalCache(CacheUtils):
417 """A cache "chain" for moving keys to a new cluster live.
419 `original_cache` is the cache chain previously in use (this'll frequently
420 be `g.cache` since it's the catch-all for most things) and
421 `replacement_cache` is the new place for the keys using this chain to
422 live. `key_transform` is an optional function to translate the key names
423 into different names on the `replacement_cache`.
425 To use this cache chain, do three separate deployments as follows:
427 * start dual-writing to the new pool by putting this chain in place
428 with `read_original=True`.
429 * cut reads over to the new pool after it is sufficiently heated up by
430 deploying `read_original=False`.
431 * remove this cache chain entirely and replace it with
432 `replacement_cache`.
434 This ensures that at any point, all apps regardless of their position in
435 the push order will have a consistent view of the data in the cache pool as
436 much as is possible.
440 def __init__(
441 self, original_cache, replacement_cache, read_original,
442 key_transform=None):
443 self.original = original_cache
444 self.replacement = replacement_cache
445 self.read_original = read_original
446 self.key_transform = key_transform
448 @property
449 def stats(self):
450 if self.read_original:
451 return self.original.stats
452 else:
453 return self.replacement.stats
455 @property
456 def read_chain(self):
457 if self.read_original:
458 return self.original
459 else:
460 return self.replacement
462 @property
463 def caches(self):
464 if self.read_original:
465 return self.original.caches
466 else:
467 return self.replacement.caches
469 def transform_memcache_key(self, args, kwargs):
470 if self.key_transform:
471 old_key = args[0]
472 prefix = kwargs.get("prefix", "")
474 if isinstance(old_key, dict): # multiget passes a dict
475 new_key = {
476 self.key_transform(k, prefix): v
477 for k, v in old_key.iteritems()
479 elif isinstance(old_key, list):
480 new_key = [self.key_transform(k, prefix) for k in old_key]
481 else:
482 new_key = self.key_transform(old_key, prefix)
484 return (new_key,) + args[1:]
485 else:
486 return args
488 def make_get_fn(fn_name):
489 def transitional_cache_get_fn(self, *args, **kwargs):
490 if self.read_original:
491 return getattr(self.original, fn_name)(*args, **kwargs)
492 else:
493 args = self.transform_memcache_key(args, kwargs)
495 # remove the prefix argument because the transform fn has
496 # already tacked that onto the keys
497 try:
498 kwargs.pop("prefix")
499 except KeyError:
500 pass
502 return getattr(self.replacement, fn_name)(*args, **kwargs)
503 return transitional_cache_get_fn
505 get = make_get_fn("get")
506 get_multi = make_get_fn("get_multi")
507 simple_get_multi = make_get_fn("simple_get_multi")
509 def make_set_fn(fn_name):
510 def transitional_cache_set_fn(self, *args, **kwargs):
511 ret_original = getattr(self.original, fn_name)(*args, **kwargs)
512 args = self.transform_memcache_key(args, kwargs)
514 # remove the prefix argument because the transform fn has
515 # already tacked that onto the keys
516 try:
517 kwargs.pop("prefix")
518 except KeyError:
519 pass
521 ret_replacement = getattr(self.replacement, fn_name)(*args, **kwargs)
522 if self.read_original:
523 return ret_original
524 else:
525 return ret_replacement
526 return transitional_cache_set_fn
528 add = make_set_fn("add")
529 set = make_set_fn("set")
530 append = make_set_fn("append")
531 prepend = make_set_fn("prepend")
532 replace = make_set_fn("replace")
533 set_multi = make_set_fn("set_multi")
534 add = make_set_fn("add")
535 add_multi = make_set_fn("add_multi")
536 incr = make_set_fn("incr")
537 incr_multi = make_set_fn("incr_multi")
538 decr = make_set_fn("decr")
539 delete = make_set_fn("delete")
540 delete_multi = make_set_fn("delete_multi")
541 flush_all = make_set_fn("flush_all")
544 def cache_timer_decorator(fn_name):
545 """Use to decorate CacheChain operations so timings will be recorded."""
546 def wrap(fn):
547 def timed_fn(self, *a, **kw):
548 use_timer = kw.pop("use_timer", True)
550 try:
551 getattr(g, "log")
552 except TypeError:
553 # don't have access to g, maybe in a thread?
554 return fn(self, *a, **kw)
556 if use_timer and self.stats:
557 publish = random.random() < g.stats.CACHE_SAMPLE_RATE
558 cache_name = self.stats.cache_name
559 timer_name = "cache.%s.%s" % (cache_name, fn_name)
560 timer = g.stats.get_timer(timer_name, publish)
561 timer.start()
562 else:
563 timer = None
565 result = fn(self, *a, **kw)
566 if timer:
567 timer.stop()
569 return result
570 return timed_fn
571 return wrap
574 class CacheChain(CacheUtils, local):
575 def __init__(self, caches, cache_negative_results=False,
576 check_keys=True):
577 self.caches = caches
578 self.cache_negative_results = cache_negative_results
579 self.stats = None
580 self.check_keys = check_keys
582 def make_set_fn(fn_name):
583 @cache_timer_decorator(fn_name)
584 def fn(self, *a, **kw):
585 ret = None
586 for c in self.caches:
587 ret = getattr(c, fn_name)(*a, **kw)
588 return ret
589 return fn
591 # note that because of the naive nature of `add' when used on a
592 # cache chain, its return value isn't reliable. if you need to
593 # verify its return value you'll either need to make it smarter or
594 # use the underlying cache directly
595 add = make_set_fn('add')
597 set = make_set_fn('set')
598 append = make_set_fn('append')
599 prepend = make_set_fn('prepend')
600 replace = make_set_fn('replace')
601 set_multi = make_set_fn('set_multi')
602 add = make_set_fn('add')
603 add_multi = make_set_fn('add_multi')
604 incr = make_set_fn('incr')
605 incr_multi = make_set_fn('incr_multi')
606 decr = make_set_fn('decr')
607 delete = make_set_fn('delete')
608 delete_multi = make_set_fn('delete_multi')
609 flush_all = make_set_fn('flush_all')
610 cache_negative_results = False
612 @cache_timer_decorator("get")
613 def get(self, key, default = None, allow_local = True, stale=None):
614 stat_outcome = False # assume a miss until a result is found
615 is_localcache = False
616 try:
617 for c in self.caches:
618 is_localcache = isinstance(c, LocalCache)
619 if not allow_local and is_localcache:
620 continue
622 val = c.get(key)
624 if val is not None:
625 if not c.permanent:
626 stat_outcome = True
628 #update other caches
629 for d in self.caches:
630 if c is d:
631 break # so we don't set caches later in the chain
632 d.set(key, val)
634 if val == NoneResult:
635 return default
636 else:
637 return val
639 if self.cache_negative_results:
640 for c in self.caches[:-1]:
641 c.set(key, NoneResult)
643 return default
644 finally:
645 if self.stats:
646 if stat_outcome:
647 if not is_localcache:
648 self.stats.cache_hit()
649 else:
650 self.stats.cache_miss()
652 def get_multi(self, keys, prefix='', allow_local = True, **kw):
653 l = lambda ks: self.simple_get_multi(ks, allow_local = allow_local, **kw)
654 return prefix_keys(keys, prefix, l)
656 @cache_timer_decorator("get_multi")
657 def simple_get_multi(self, keys, allow_local = True, stale=None,
658 stat_subname=None):
659 out = {}
660 need = set(keys)
661 hits = 0
662 local_hits = 0
663 misses = 0
664 for c in self.caches:
665 is_localcache = isinstance(c, LocalCache)
666 if not allow_local and is_localcache:
667 continue
669 if c.permanent and not misses:
670 # Once we reach a "permanent" cache, we count any outstanding
671 # items as misses.
672 misses = len(need)
674 if len(out) == len(keys):
675 # we've found them all
676 break
678 r = c.simple_get_multi(need)
679 #update other caches
680 if r:
681 if is_localcache:
682 local_hits += len(r)
683 elif not c.permanent:
684 hits += len(r)
686 for d in self.caches:
687 if c is d:
688 break # so we don't set caches later in the chain
689 d.set_multi(r)
690 r.update(out)
691 out = r
692 need = need - set(r.keys())
694 if need and self.cache_negative_results:
695 d = dict((key, NoneResult) for key in need)
696 for c in self.caches[:-1]:
697 c.set_multi(d)
699 out = dict((k, v)
700 for (k, v) in out.iteritems()
701 if v != NoneResult)
703 if self.stats:
704 if not misses:
705 # If this chain contains no permanent caches, then we need to
706 # count the misses here.
707 misses = len(need)
708 self.stats.cache_hit(hits, subname=stat_subname)
709 self.stats.cache_miss(misses, subname=stat_subname)
711 return out
713 def __repr__(self):
714 return '<%s %r>' % (self.__class__.__name__,
715 self.caches)
717 def debug(self, key):
718 print "Looking up [%r]" % key
719 for i, c in enumerate(self.caches):
720 print "[%d] %10s has value [%r]" % (i, c.__class__.__name__,
721 c.get(key))
723 def reset(self):
724 # the first item in a cache chain is a LocalCache
725 self.caches = (self.caches[0].__class__(),) + self.caches[1:]
727 class MemcacheChain(CacheChain):
728 pass
730 class HardcacheChain(CacheChain):
731 def add(self, key, val, time=0):
732 authority = self.caches[-1] # the authority is the hardcache
733 # itself
734 added_val = authority.add(key, val, time=time)
735 for cache in self.caches[:-1]:
736 # Calling set() rather than add() to ensure that all caches are
737 # in sync and that de-syncs repair themselves
738 cache.set(key, added_val, time=time)
740 return added_val
742 def accrue(self, key, time=0, delta=1):
743 auth_value = self.caches[-1].get(key)
745 if auth_value is None:
746 auth_value = 0
748 try:
749 auth_value = int(auth_value) + delta
750 except ValueError:
751 raise ValueError("Can't accrue %s; it's a %s (%r)" %
752 (key, auth_value.__class__.__name__, auth_value))
754 for c in self.caches:
755 c.set(key, auth_value, time=time)
757 return auth_value
759 @property
760 def backend(self):
761 # the hardcache is always the last item in a HardCacheChain
762 return self.caches[-1].backend
764 class StaleCacheChain(CacheChain):
765 """A cache chain of two cache chains. When allowed by `stale`,
766 answers may be returned by a "closer" but potentially older
767 cache. Probably doesn't play well with NoneResult cacheing"""
768 staleness = 30
770 def __init__(self, localcache, stalecache, realcache, check_keys=True):
771 self.localcache = localcache
772 self.stalecache = stalecache
773 self.realcache = realcache
774 self.caches = (localcache, realcache) # for the other
775 # CacheChain machinery
776 self.stats = None
777 self.check_keys = check_keys
779 @cache_timer_decorator("get")
780 def get(self, key, default=None, stale = False, **kw):
781 if kw.get('allow_local', True) and key in self.localcache:
782 return self.localcache[key]
784 if stale:
785 stale_value = self._getstale([key]).get(key, None)
786 if stale_value is not None:
787 if self.stats:
788 self.stats.cache_hit()
789 self.stats.stale_hit()
790 return stale_value # never return stale data into the
791 # LocalCache, or people that didn't
792 # say they'll take stale data may
793 # get it
794 else:
795 self.stats.stale_miss()
797 value = self.realcache.get(key)
798 if value is None:
799 if self.stats:
800 self.stats.cache_miss()
801 return default
803 if stale:
804 self.stalecache.set(key, value, time=self.staleness)
806 self.localcache.set(key, value)
808 if self.stats:
809 self.stats.cache_hit()
811 return value
813 @cache_timer_decorator("get_multi")
814 def simple_get_multi(self, keys, stale=False, stat_subname=None, **kw):
815 if not isinstance(keys, set):
816 keys = set(keys)
818 ret = {}
819 local_hits = 0
821 if kw.get('allow_local'):
822 for k in list(keys):
823 if k in self.localcache:
824 ret[k] = self.localcache[k]
825 keys.remove(k)
826 local_hits += 1
828 if keys and stale:
829 stale_values = self._getstale(keys)
830 # never put stale data into the localcache
831 for k, v in stale_values.iteritems():
832 ret[k] = v
833 keys.remove(k)
835 stale_hits = len(stale_values)
836 stale_misses = len(keys)
837 if self.stats:
838 self.stats.stale_hit(stale_hits, subname=stat_subname)
839 self.stats.stale_miss(stale_misses, subname=stat_subname)
841 if keys:
842 values = self.realcache.simple_get_multi(keys)
843 if values and stale:
844 self.stalecache.set_multi(values, time=self.staleness)
845 self.localcache.update(values)
846 ret.update(values)
848 if self.stats:
849 misses = len(keys - set(ret.keys()))
850 hits = len(ret) - local_hits
851 self.stats.cache_hit(hits, subname=stat_subname)
852 self.stats.cache_miss(misses, subname=stat_subname)
854 return ret
856 def _getstale(self, keys):
857 # this is only in its own function to make tapping it for
858 # debugging easier
859 return self.stalecache.simple_get_multi(keys)
861 def reset(self):
862 newcache = self.localcache.__class__()
863 self.localcache = newcache
864 self.caches = (newcache,) + self.caches[1:]
865 if isinstance(self.realcache, CacheChain):
866 assert isinstance(self.realcache.caches[0], LocalCache)
867 self.realcache.caches = (newcache,) + self.realcache.caches[1:]
869 def __repr__(self):
870 return '<%s %r>' % (self.__class__.__name__,
871 (self.localcache, self.stalecache, self.realcache))
873 CL_ONE = ConsistencyLevel.ONE
874 CL_QUORUM = ConsistencyLevel.QUORUM
877 class Permacache(object):
878 """Cassandra key/value column family backend with a cachechain in front.
880 Probably best to not think of this as a cache but rather as a key/value
881 datastore that's faster to access than cassandra because of the cache.
885 COLUMN_NAME = 'value'
887 def __init__(self, cache_chain, column_family, lock_factory):
888 self.cache_chain = cache_chain
889 self.make_lock = lock_factory
890 self.cf = column_family
892 @classmethod
893 def _setup_column_family(cls, column_family_name, client):
894 cf = ColumnFamily(client, column_family_name,
895 read_consistency_level=CL_QUORUM,
896 write_consistency_level=CL_QUORUM)
897 return cf
899 def _backend_get(self, keys):
900 keys, is_single = tup(keys, ret_is_single=True)
901 rows = self.cf.multiget(keys, columns=[self.COLUMN_NAME])
902 ret = {
903 key: pickle.loads(columns[self.COLUMN_NAME])
904 for key, columns in rows.iteritems()
906 if is_single:
907 if ret:
908 return ret.values()[0]
909 else:
910 return None
911 else:
912 return ret
914 def _backend_set(self, key, val):
915 keys = {key: val}
916 ret = self._backend_set_multi(keys)
917 return ret.get(key)
919 def _backend_set_multi(self, keys, prefix=''):
920 ret = {}
921 with self.cf.batch():
922 for key, val in keys.iteritems():
923 rowkey = "%s%s" % (prefix, key)
924 column = {self.COLUMN_NAME: pickle.dumps(val, protocol=2)}
925 ret[key] = self.cf.insert(rowkey, column)
926 return ret
928 def _backend_delete(self, key):
929 self.cf.remove(key)
931 def get(self, key, default=None, allow_local=True, stale=False):
932 val = self.cache_chain.get(
933 key, default=None, allow_local=allow_local, stale=stale)
935 if val is None:
936 val = self._backend_get(key)
937 if val:
938 self.cache_chain.set(key, val)
939 return val
941 def set(self, key, val):
942 self._backend_set(key, val)
943 self.cache_chain.set(key, val)
945 def set_multi(self, keys, prefix='', time=None):
946 # time is sent by sgm but will be ignored
947 self._backend_set_multi(keys, prefix=prefix)
948 self.cache_chain.set_multi(keys, prefix=prefix)
950 def pessimistically_set(self, key, value):
952 Sets a value in Cassandra but instead of setting it in memcached,
953 deletes it from there instead. This is useful for the mr_top job which
954 sets thousands of keys but almost all of them will never be read out of
956 self._backend_set(key, value)
957 self.cache_chain.delete(key)
959 def get_multi(self, keys, prefix='', allow_local=True, stale=False):
960 call_fn = lambda k: self.simple_get_multi(k, allow_local=allow_local,
961 stale=stale)
962 return prefix_keys(keys, prefix, call_fn)
964 def simple_get_multi(self, keys, allow_local=True, stale=False):
965 ret = self.cache_chain.simple_get_multi(
966 keys, allow_local=allow_local, stale=stale)
967 still_need = {key for key in keys if key not in ret}
968 if still_need:
969 from_cass = self._backend_get(keys)
970 self.cache_chain.set_multi(from_cass)
971 ret.update(from_cass)
972 return ret
974 def delete(self, key):
975 self._backend_delete(key)
976 self.cache_chain.delete(key)
978 def mutate(self, key, mutation_fn, default=None, willread=True):
979 """Mutate a Cassandra key as atomically as possible"""
980 with self.make_lock("permacache_mutate", "mutate_%s" % key):
981 # This has an edge-case where the cache chain was populated by a ONE
982 # read rather than a QUORUM one just before running this. All reads
983 # should use consistency level QUORUM.
984 if willread:
985 value = self.cache_chain.get(key, allow_local=False)
986 if value is None:
987 value = self._backend_get(key)
988 else:
989 value = None
991 # send in a copy in case they mutate it in-place
992 new_value = mutation_fn(copy(value))
994 if not willread or value != new_value:
995 self._backend_set(key, new_value)
996 self.cache_chain.set(key, new_value, use_timer=False)
997 return new_value
999 def __repr__(self):
1000 return '<%s %r %r>' % (self.__class__.__name__,
1001 self.cache_chain, self.cf.column_family)
1004 def test_cache(cache, prefix=''):
1005 #basic set/get
1006 cache.set('%s1' % prefix, 1)
1007 assert cache.get('%s1' % prefix) == 1
1009 #python data
1010 cache.set('%s2' % prefix, [1,2,3])
1011 assert cache.get('%s2' % prefix) == [1,2,3]
1013 #set multi, no prefix
1014 cache.set_multi({'%s3' % prefix:3, '%s4' % prefix: 4})
1015 assert cache.get_multi(('%s3' % prefix, '%s4' % prefix)) == {'%s3' % prefix: 3,
1016 '%s4' % prefix: 4}
1018 #set multi, prefix
1019 cache.set_multi({'3':3, '4': 4}, prefix='%sp_' % prefix)
1020 assert cache.get_multi(('3', 4), prefix='%sp_' % prefix) == {'3':3, 4: 4}
1021 assert cache.get_multi(('%sp_3' % prefix, '%sp_4' % prefix)) == {'%sp_3'%prefix: 3,
1022 '%sp_4'%prefix: 4}
1024 # delete
1025 cache.set('%s1'%prefix, 1)
1026 assert cache.get('%s1'%prefix) == 1
1027 cache.delete('%s1'%prefix)
1028 assert cache.get('%s1'%prefix) is None
1030 cache.set('%s1'%prefix, 1)
1031 cache.set('%s2'%prefix, 2)
1032 cache.set('%s3'%prefix, 3)
1033 assert cache.get('%s1'%prefix) == 1 and cache.get('%s2'%prefix) == 2
1034 cache.delete_multi(['%s1'%prefix, '%s2'%prefix])
1035 assert (cache.get('%s1'%prefix) is None
1036 and cache.get('%s2'%prefix) is None
1037 and cache.get('%s3'%prefix) == 3)
1039 #incr
1040 cache.set('%s5'%prefix, 1)
1041 cache.set('%s6'%prefix, 1)
1042 cache.incr('%s5'%prefix)
1043 assert cache.get('%s5'%prefix) == 2
1044 cache.incr('%s5'%prefix,2)
1045 assert cache.get('%s5'%prefix) == 4
1046 cache.incr_multi(('%s5'%prefix, '%s6'%prefix), 1)
1047 assert cache.get('%s5'%prefix) == 5
1048 assert cache.get('%s6'%prefix) == 2
1050 def test_multi(cache):
1051 from threading import Thread
1053 num_threads = 100
1054 num_per_thread = 1000
1056 threads = []
1057 for x in range(num_threads):
1058 def _fn(prefix):
1059 def __fn():
1060 for y in range(num_per_thread):
1061 test_cache(cache,prefix=prefix)
1062 return __fn
1063 t = Thread(target=_fn(str(x)))
1064 t.start()
1065 threads.append(t)
1067 for thread in threads:
1068 thread.join()
1070 # a cache that occasionally dumps itself to be used for long-running
1071 # processes
1072 class SelfEmptyingCache(LocalCache):
1073 def __init__(self, max_size=10*1000):
1074 self.max_size = max_size
1076 def maybe_reset(self):
1077 if len(self) > self.max_size:
1078 self.clear()
1080 def set(self, key, val, time=0):
1081 self.maybe_reset()
1082 return LocalCache.set(self,key,val,time)
1084 def add(self, key, val, time=0):
1085 self.maybe_reset()
1086 return LocalCache.add(self, key, val)
1089 def _make_hashable(s):
1090 if isinstance(s, str):
1091 return s
1092 elif isinstance(s, unicode):
1093 return s.encode('utf-8')
1094 elif isinstance(s, (tuple, list)):
1095 return ','.join(_make_hashable(x) for x in s)
1096 elif isinstance(s, dict):
1097 return ','.join('%s:%s' % (_make_hashable(k), _make_hashable(v))
1098 for (k, v) in sorted(s.iteritems()))
1099 else:
1100 return str(s)
1103 def make_key_id(*a, **kw):
1104 h = md5()
1105 h.update(_make_hashable(a))
1106 h.update(_make_hashable(kw))
1107 return h.hexdigest()
1109 def make_key(iden, *a, **kw):
1111 A helper function for making memcached-usable cache keys out of
1112 arbitrary arguments. Hashes the arguments but leaves the `iden'
1113 human-readable
1115 h = md5()
1116 iden = _make_hashable(iden)
1117 h.update(iden)
1118 h.update(_make_hashable(a))
1119 h.update(_make_hashable(kw))
1121 return '%s(%s)' % (iden, h.hexdigest())
1124 def test_stale():
1125 from pylons import app_globals as g
1126 ca = g.cache
1127 assert isinstance(ca, StaleCacheChain)
1129 ca.localcache.clear()
1131 ca.stalecache.set('foo', 'bar', time=ca.staleness)
1132 assert ca.stalecache.get('foo') == 'bar'
1133 ca.realcache.set('foo', 'baz')
1134 assert ca.realcache.get('foo') == 'baz'
1136 assert ca.get('foo', stale=True) == 'bar'
1137 ca.localcache.clear()
1138 assert ca.get('foo', stale=False) == 'baz'
1139 ca.localcache.clear()
1141 assert ca.get_multi(['foo'], stale=True) == {'foo': 'bar'}
1142 assert len(ca.localcache) == 0
1143 assert ca.get_multi(['foo'], stale=False) == {'foo': 'baz'}
1144 ca.localcache.clear()