1 # The contents of this file are subject to the Common Public Attribution
2 # License Version 1.0. (the "License"); you may not use this file except in
3 # compliance with the License. You may obtain a copy of the License at
4 # http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
5 # License Version 1.1, but Sections 14 and 15 have been added to cover use of
6 # software over a computer network and provide for limited attribution for the
7 # Original Developer. In addition, Exhibit A has been modified to be consistent
10 # Software distributed under the License is distributed on an "AS IS" basis,
11 # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
12 # the specific language governing rights and limitations under the License.
14 # The Original Code is reddit.
16 # The Original Developer is the Initial Developer. The Initial Developer of
17 # the Original Code is reddit Inc.
19 # All portions of the code written by reddit are Copyright (c) 2006-2015 reddit
20 # Inc. All Rights Reserved.
21 ###############################################################################
24 from threading
import local
25 from hashlib
import md5
26 import cPickle
as pickle
28 from curses
.ascii
import isgraph
30 from time
import sleep
32 from pylons
import app_globals
as g
35 from _pylibmc
import MemcachedError
39 from pycassa
import ColumnFamily
40 from pycassa
.cassandra
.ttypes
import ConsistencyLevel
42 from r2
.lib
.utils
import in_chunks
, prefix_keys
, trace
, tup
43 from r2
.lib
.hardcachebackend
import HardCacheBackend
45 from r2
.lib
.sgm
import sgm
# get this into our namespace so that it's
50 # This is for use in the health controller
51 _CACHE_SERVERS
= set()
53 class NoneResult(object): pass
55 class CacheUtils(object):
56 # Caches that never expire entries should set this to true, so that
57 # CacheChain can properly count hits and misses.
60 def incr_multi(self
, keys
, delta
=1, prefix
=''):
63 self
.incr(prefix
+ k
, delta
)
67 def add_multi(self
, keys
, prefix
='', time
=0):
68 for k
,v
in keys
.iteritems():
69 self
.add(prefix
+str(k
), v
, time
= time
)
71 def get_multi(self
, keys
, prefix
='', **kw
):
72 return prefix_keys(keys
, prefix
, lambda k
: self
.simple_get_multi(k
, **kw
))
75 class MemcachedValueSizeException(Exception):
76 def __init__(self
, cache_name
, caller
, prefix
, key
, size
):
79 self
.cache_name
= cache_name
84 return ("Memcached %s %s: The object for key '%s%s' is too big for memcached at %s bytes" %
85 (self
.cache_name
, self
.caller
, self
.prefix
, self
.key
, self
.size
))
88 # validation functions to be used by memcached pools
89 MEMCACHED_MAX_VALUE_SIZE
= 2048 * 1024 # 2MB
92 def is_valid_size_for_cache(obj
):
93 # NOTE: only the memory consumption directly attributed to the object is
94 # accounted for, not the memory consumption of objects it refers to.
95 # For instance a tuple of strings will only appear to be the size of a
97 return sys
.getsizeof(obj
) < MEMCACHED_MAX_VALUE_SIZE
100 def validate_size_warn(**kwargs
):
101 if 'value' in kwargs
:
102 if not is_valid_size_for_cache(kwargs
["value"]):
104 "memcached_large_object",
105 kwargs
.get("cache_name", "undefined")
107 g
.stats
.simple_event(key
)
109 "Memcached %s: Attempted to cache an object > 1MB at key: '%s%s' of size %s bytes",
110 kwargs
.get("caller", "unknown"),
111 kwargs
.get("prefix", ""),
112 kwargs
.get("key", "undefined"),
113 sys
.getsizeof(kwargs
["value"])
119 def validate_size_error(**kwargs
):
120 if 'value' in kwargs
:
121 if not is_valid_size_for_cache(kwargs
["value"]):
122 raise MemcachedValueSizeException(
123 kwargs
.get("cache_name", "unknown"),
124 kwargs
.get("caller", "unknown"),
125 kwargs
.get("prefix", ""),
126 kwargs
.get("key", "undefined"),
127 sys
.getsizeof(kwargs
["value"])
132 class CMemcache(CacheUtils
):
139 min_compress_len
=512 * 1024,
144 self
.servers
= servers
145 self
.clients
= pylibmc
.ClientPool(n_slots
= num_clients
)
146 self
.validators
= validators
or []
148 for x
in xrange(num_clients
):
149 client
= pylibmc
.Client(servers
, binary
=binary
)
151 'no_block': no_block
, # use async I/O
152 'tcp_nodelay': True, # no nagle
153 '_noreply': int(noreply
),
154 'ketama': True, # consistent hashing
157 behaviors
['verify_keys'] = True
159 client
.behaviors
.update(behaviors
)
160 self
.clients
.put(client
)
162 self
.min_compress_len
= min_compress_len
164 _CACHE_SERVERS
.update(servers
)
166 def validate(self
, **kwargs
):
167 kwargs
['caller'] = sys
._getframe
().f_back
.f_code
.co_name
168 kwargs
['cache_name'] = self
.name
169 if not all(validator(**kwargs
) for validator
in self
.validators
):
174 def get(self
, key
, default
= None):
175 if not self
.validate(key
=key
):
178 with self
.clients
.reserve() as mc
:
179 ret
= mc
.get(str(key
))
184 def get_multi(self
, keys
, prefix
= ''):
185 validated_keys
= [k
for k
in (str(k
) for k
in keys
)
186 if self
.validate(prefix
=prefix
, key
=k
)]
188 with self
.clients
.reserve() as mc
:
189 return mc
.get_multi(validated_keys
, key_prefix
= prefix
)
191 # simple_get_multi exists so that a cache chain can
192 # single-instance the handling of prefixes for performance, but
193 # pylibmc does this in C which is faster anyway, so CMemcache
194 # implements get_multi itself. But the CacheChain still wants
195 # simple_get_multi to be available for when it's already prefixed
196 # them, so here it is
197 simple_get_multi
= get_multi
199 def set(self
, key
, val
, time
= 0):
200 if not self
.validate(key
=key
, value
=val
):
203 with self
.clients
.reserve() as mc
:
204 return mc
.set(str(key
), val
, time
=time
,
205 min_compress_len
= self
.min_compress_len
)
207 def set_multi(self
, keys
, prefix
='', time
=0):
208 str_keys
= ((str(k
), v
) for k
, v
in keys
.iteritems())
209 validated_keys
= {k
: v
for k
, v
in str_keys
210 if self
.validate(prefix
=prefix
, key
=k
, value
=v
)}
212 with self
.clients
.reserve() as mc
:
213 return mc
.set_multi(validated_keys
, key_prefix
= prefix
,
215 min_compress_len
= self
.min_compress_len
)
217 def add_multi(self
, keys
, prefix
='', time
=0):
218 str_keys
= ((str(k
), v
) for k
, v
in keys
.iteritems())
219 validated_keys
= {k
: v
for k
, v
in str_keys
220 if self
.validate(prefix
=prefix
, key
=k
, value
=v
)}
222 with self
.clients
.reserve() as mc
:
223 return mc
.add_multi(validated_keys
, key_prefix
= prefix
,
226 def incr_multi(self
, keys
, prefix
='', delta
=1):
227 validated_keys
= [k
for k
in (str(k
) for k
in keys
)
228 if self
.validate(prefix
=prefix
, key
=k
)]
230 with self
.clients
.reserve() as mc
:
231 return mc
.incr_multi(validated_keys
,
235 def append(self
, key
, val
, time
=0):
236 if not self
.validate(key
=key
, value
=val
):
239 with self
.clients
.reserve() as mc
:
240 return mc
.append(str(key
), val
, time
=time
)
242 def incr(self
, key
, delta
=1, time
=0):
243 if not self
.validate(key
=key
):
246 # ignore the time on these
247 with self
.clients
.reserve() as mc
:
248 return mc
.incr(str(key
), delta
)
250 def add(self
, key
, val
, time
=0):
251 if not self
.validate(key
=key
, value
=val
):
255 with self
.clients
.reserve() as mc
:
256 return mc
.add(str(key
), val
, time
=time
)
257 except pylibmc
.DataExists
:
260 def delete(self
, key
, time
=0):
261 if not self
.validate(key
=key
):
264 with self
.clients
.reserve() as mc
:
265 return mc
.delete(str(key
))
267 def delete_multi(self
, keys
, prefix
=''):
268 validated_keys
= [k
for k
in (str(k
) for k
in keys
)
269 if self
.validate(prefix
=prefix
, key
=k
)]
271 with self
.clients
.reserve() as mc
:
272 return mc
.delete_multi(validated_keys
, key_prefix
=prefix
)
275 return '<%s(%r)>' % (self
.__class
__.__name
__,
278 class HardCache(CacheUtils
):
282 def __init__(self
, gc
):
283 self
.backend
= HardCacheBackend(gc
)
285 def _split_key(self
, key
):
286 tokens
= key
.split("-", 1)
288 raise ValueError("key %s has no dash" % key
)
290 category
, ids
= tokens
293 def set(self
, key
, val
, time
=0):
294 if val
== NoneResult
:
295 # NoneResult caching is for other parts of the chain
298 category
, ids
= self
._split
_key
(key
)
299 self
.backend
.set(category
, ids
, val
, time
)
301 def simple_get_multi(self
, keys
):
303 category_bundles
= {}
305 category
, ids
= self
._split
_key
(key
)
306 category_bundles
.setdefault(category
, []).append(ids
)
308 for category
in category_bundles
:
309 idses
= category_bundles
[category
]
310 chunks
= in_chunks(idses
, size
=50)
312 new_results
= self
.backend
.get_multi(category
, chunk
)
313 results
.update(new_results
)
317 def set_multi(self
, keys
, prefix
='', time
=0):
318 for k
,v
in keys
.iteritems():
320 self
.set(prefix
+str(k
), v
, time
=time
)
322 def get(self
, key
, default
=None):
323 category
, ids
= self
._split
_key
(key
)
324 r
= self
.backend
.get(category
, ids
)
325 if r
is None: return default
328 def delete(self
, key
, time
=0):
329 # Potential optimization: When on a negative-result caching chain,
330 # shove NoneResult throughout the chain when a key is deleted.
331 category
, ids
= self
._split
_key
(key
)
332 self
.backend
.delete(category
, ids
)
334 def add(self
, key
, value
, time
=0):
335 category
, ids
= self
._split
_key
(key
)
336 return self
.backend
.add(category
, ids
, value
, time
=time
)
338 def incr(self
, key
, delta
=1, time
=0):
339 category
, ids
= self
._split
_key
(key
)
340 return self
.backend
.incr(category
, ids
, delta
=delta
, time
=time
)
343 class LocalCache(dict, CacheUtils
):
344 def __init__(self
, *a
, **kw
):
345 return dict.__init
__(self
, *a
, **kw
)
347 def _check_key(self
, key
):
348 if isinstance(key
, unicode):
349 key
= str(key
) # try to convert it first
350 if not isinstance(key
, str):
351 raise TypeError('Key is not a string: %r' % (key
,))
353 def get(self
, key
, default
=None):
354 r
= dict.get(self
, key
)
355 if r
is None: return default
358 def simple_get_multi(self
, keys
):
365 def set(self
, key
, val
, time
= 0):
366 # time is ignored on localcache
370 def set_multi(self
, keys
, prefix
='', time
=0):
371 for k
,v
in keys
.iteritems():
372 self
.set(prefix
+str(k
), v
, time
=time
)
374 def add(self
, key
, val
, time
= 0):
377 self
.setdefault(key
, val
)
380 def delete(self
, key
):
381 if self
.has_key(key
):
384 def delete_multi(self
, keys
):
386 if self
.has_key(key
):
389 def incr(self
, key
, delta
=1, time
=0):
390 if self
.has_key(key
):
391 self
[key
] = int(self
[key
]) + delta
393 def decr(self
, key
, amt
=1):
394 if self
.has_key(key
):
395 self
[key
] = int(self
[key
]) - amt
397 def append(self
, key
, val
, time
= 0):
398 if self
.has_key(key
):
399 self
[key
] = str(self
[key
]) + val
401 def prepend(self
, key
, val
, time
= 0):
402 if self
.has_key(key
):
403 self
[key
] = val
+ str(self
[key
])
405 def replace(self
, key
, val
, time
= 0):
406 if self
.has_key(key
):
413 return "<LocalCache(%d)>" % (len(self
),)
416 class TransitionalCache(CacheUtils
):
417 """A cache "chain" for moving keys to a new cluster live.
419 `original_cache` is the cache chain previously in use (this'll frequently
420 be `g.cache` since it's the catch-all for most things) and
421 `replacement_cache` is the new place for the keys using this chain to
422 live. `key_transform` is an optional function to translate the key names
423 into different names on the `replacement_cache`.
425 To use this cache chain, do three separate deployments as follows:
427 * start dual-writing to the new pool by putting this chain in place
428 with `read_original=True`.
429 * cut reads over to the new pool after it is sufficiently heated up by
430 deploying `read_original=False`.
431 * remove this cache chain entirely and replace it with
434 This ensures that at any point, all apps regardless of their position in
435 the push order will have a consistent view of the data in the cache pool as
441 self
, original_cache
, replacement_cache
, read_original
,
443 self
.original
= original_cache
444 self
.replacement
= replacement_cache
445 self
.read_original
= read_original
446 self
.key_transform
= key_transform
450 if self
.read_original
:
451 return self
.original
.stats
453 return self
.replacement
.stats
456 def read_chain(self
):
457 if self
.read_original
:
460 return self
.replacement
464 if self
.read_original
:
465 return self
.original
.caches
467 return self
.replacement
.caches
469 def transform_memcache_key(self
, args
, kwargs
):
470 if self
.key_transform
:
472 prefix
= kwargs
.get("prefix", "")
474 if isinstance(old_key
, dict): # multiget passes a dict
476 self
.key_transform(k
, prefix
): v
477 for k
, v
in old_key
.iteritems()
479 elif isinstance(old_key
, list):
480 new_key
= [self
.key_transform(k
, prefix
) for k
in old_key
]
482 new_key
= self
.key_transform(old_key
, prefix
)
484 return (new_key
,) + args
[1:]
488 def make_get_fn(fn_name
):
489 def transitional_cache_get_fn(self
, *args
, **kwargs
):
490 if self
.read_original
:
491 return getattr(self
.original
, fn_name
)(*args
, **kwargs
)
493 args
= self
.transform_memcache_key(args
, kwargs
)
495 # remove the prefix argument because the transform fn has
496 # already tacked that onto the keys
502 return getattr(self
.replacement
, fn_name
)(*args
, **kwargs
)
503 return transitional_cache_get_fn
505 get
= make_get_fn("get")
506 get_multi
= make_get_fn("get_multi")
507 simple_get_multi
= make_get_fn("simple_get_multi")
509 def make_set_fn(fn_name
):
510 def transitional_cache_set_fn(self
, *args
, **kwargs
):
511 ret_original
= getattr(self
.original
, fn_name
)(*args
, **kwargs
)
512 args
= self
.transform_memcache_key(args
, kwargs
)
514 # remove the prefix argument because the transform fn has
515 # already tacked that onto the keys
521 ret_replacement
= getattr(self
.replacement
, fn_name
)(*args
, **kwargs
)
522 if self
.read_original
:
525 return ret_replacement
526 return transitional_cache_set_fn
528 add
= make_set_fn("add")
529 set = make_set_fn("set")
530 append
= make_set_fn("append")
531 prepend
= make_set_fn("prepend")
532 replace
= make_set_fn("replace")
533 set_multi
= make_set_fn("set_multi")
534 add
= make_set_fn("add")
535 add_multi
= make_set_fn("add_multi")
536 incr
= make_set_fn("incr")
537 incr_multi
= make_set_fn("incr_multi")
538 decr
= make_set_fn("decr")
539 delete
= make_set_fn("delete")
540 delete_multi
= make_set_fn("delete_multi")
541 flush_all
= make_set_fn("flush_all")
544 def cache_timer_decorator(fn_name
):
545 """Use to decorate CacheChain operations so timings will be recorded."""
547 def timed_fn(self
, *a
, **kw
):
548 use_timer
= kw
.pop("use_timer", True)
553 # don't have access to g, maybe in a thread?
554 return fn(self
, *a
, **kw
)
556 if use_timer
and self
.stats
:
557 publish
= random
.random() < g
.stats
.CACHE_SAMPLE_RATE
558 cache_name
= self
.stats
.cache_name
559 timer_name
= "cache.%s.%s" % (cache_name
, fn_name
)
560 timer
= g
.stats
.get_timer(timer_name
, publish
)
565 result
= fn(self
, *a
, **kw
)
574 class CacheChain(CacheUtils
, local
):
575 def __init__(self
, caches
, cache_negative_results
=False,
578 self
.cache_negative_results
= cache_negative_results
580 self
.check_keys
= check_keys
582 def make_set_fn(fn_name
):
583 @cache_timer_decorator(fn_name
)
584 def fn(self
, *a
, **kw
):
586 for c
in self
.caches
:
587 ret
= getattr(c
, fn_name
)(*a
, **kw
)
591 # note that because of the naive nature of `add' when used on a
592 # cache chain, its return value isn't reliable. if you need to
593 # verify its return value you'll either need to make it smarter or
594 # use the underlying cache directly
595 add
= make_set_fn('add')
597 set = make_set_fn('set')
598 append
= make_set_fn('append')
599 prepend
= make_set_fn('prepend')
600 replace
= make_set_fn('replace')
601 set_multi
= make_set_fn('set_multi')
602 add
= make_set_fn('add')
603 add_multi
= make_set_fn('add_multi')
604 incr
= make_set_fn('incr')
605 incr_multi
= make_set_fn('incr_multi')
606 decr
= make_set_fn('decr')
607 delete
= make_set_fn('delete')
608 delete_multi
= make_set_fn('delete_multi')
609 flush_all
= make_set_fn('flush_all')
610 cache_negative_results
= False
612 @cache_timer_decorator("get")
613 def get(self
, key
, default
= None, allow_local
= True, stale
=None):
614 stat_outcome
= False # assume a miss until a result is found
615 is_localcache
= False
617 for c
in self
.caches
:
618 is_localcache
= isinstance(c
, LocalCache
)
619 if not allow_local
and is_localcache
:
629 for d
in self
.caches
:
631 break # so we don't set caches later in the chain
634 if val
== NoneResult
:
639 if self
.cache_negative_results
:
640 for c
in self
.caches
[:-1]:
641 c
.set(key
, NoneResult
)
647 if not is_localcache
:
648 self
.stats
.cache_hit()
650 self
.stats
.cache_miss()
652 def get_multi(self
, keys
, prefix
='', allow_local
= True, **kw
):
653 l
= lambda ks
: self
.simple_get_multi(ks
, allow_local
= allow_local
, **kw
)
654 return prefix_keys(keys
, prefix
, l
)
656 @cache_timer_decorator("get_multi")
657 def simple_get_multi(self
, keys
, allow_local
= True, stale
=None,
664 for c
in self
.caches
:
665 is_localcache
= isinstance(c
, LocalCache
)
666 if not allow_local
and is_localcache
:
669 if c
.permanent
and not misses
:
670 # Once we reach a "permanent" cache, we count any outstanding
674 if len(out
) == len(keys
):
675 # we've found them all
678 r
= c
.simple_get_multi(need
)
683 elif not c
.permanent
:
686 for d
in self
.caches
:
688 break # so we don't set caches later in the chain
692 need
= need
- set(r
.keys())
694 if need
and self
.cache_negative_results
:
695 d
= dict((key
, NoneResult
) for key
in need
)
696 for c
in self
.caches
[:-1]:
700 for (k
, v
) in out
.iteritems()
705 # If this chain contains no permanent caches, then we need to
706 # count the misses here.
708 self
.stats
.cache_hit(hits
, subname
=stat_subname
)
709 self
.stats
.cache_miss(misses
, subname
=stat_subname
)
714 return '<%s %r>' % (self
.__class
__.__name
__,
717 def debug(self
, key
):
718 print "Looking up [%r]" % key
719 for i
, c
in enumerate(self
.caches
):
720 print "[%d] %10s has value [%r]" % (i
, c
.__class
__.__name
__,
724 # the first item in a cache chain is a LocalCache
725 self
.caches
= (self
.caches
[0].__class
__(),) + self
.caches
[1:]
727 class MemcacheChain(CacheChain
):
730 class HardcacheChain(CacheChain
):
731 def add(self
, key
, val
, time
=0):
732 authority
= self
.caches
[-1] # the authority is the hardcache
734 added_val
= authority
.add(key
, val
, time
=time
)
735 for cache
in self
.caches
[:-1]:
736 # Calling set() rather than add() to ensure that all caches are
737 # in sync and that de-syncs repair themselves
738 cache
.set(key
, added_val
, time
=time
)
742 def accrue(self
, key
, time
=0, delta
=1):
743 auth_value
= self
.caches
[-1].get(key
)
745 if auth_value
is None:
749 auth_value
= int(auth_value
) + delta
751 raise ValueError("Can't accrue %s; it's a %s (%r)" %
752 (key
, auth_value
.__class
__.__name
__, auth_value
))
754 for c
in self
.caches
:
755 c
.set(key
, auth_value
, time
=time
)
761 # the hardcache is always the last item in a HardCacheChain
762 return self
.caches
[-1].backend
764 class StaleCacheChain(CacheChain
):
765 """A cache chain of two cache chains. When allowed by `stale`,
766 answers may be returned by a "closer" but potentially older
767 cache. Probably doesn't play well with NoneResult cacheing"""
770 def __init__(self
, localcache
, stalecache
, realcache
, check_keys
=True):
771 self
.localcache
= localcache
772 self
.stalecache
= stalecache
773 self
.realcache
= realcache
774 self
.caches
= (localcache
, realcache
) # for the other
775 # CacheChain machinery
777 self
.check_keys
= check_keys
779 @cache_timer_decorator("get")
780 def get(self
, key
, default
=None, stale
= False, **kw
):
781 if kw
.get('allow_local', True) and key
in self
.localcache
:
782 return self
.localcache
[key
]
785 stale_value
= self
._getstale
([key
]).get(key
, None)
786 if stale_value
is not None:
788 self
.stats
.cache_hit()
789 self
.stats
.stale_hit()
790 return stale_value
# never return stale data into the
791 # LocalCache, or people that didn't
792 # say they'll take stale data may
795 self
.stats
.stale_miss()
797 value
= self
.realcache
.get(key
)
800 self
.stats
.cache_miss()
804 self
.stalecache
.set(key
, value
, time
=self
.staleness
)
806 self
.localcache
.set(key
, value
)
809 self
.stats
.cache_hit()
813 @cache_timer_decorator("get_multi")
814 def simple_get_multi(self
, keys
, stale
=False, stat_subname
=None, **kw
):
815 if not isinstance(keys
, set):
821 if kw
.get('allow_local'):
823 if k
in self
.localcache
:
824 ret
[k
] = self
.localcache
[k
]
829 stale_values
= self
._getstale
(keys
)
830 # never put stale data into the localcache
831 for k
, v
in stale_values
.iteritems():
835 stale_hits
= len(stale_values
)
836 stale_misses
= len(keys
)
838 self
.stats
.stale_hit(stale_hits
, subname
=stat_subname
)
839 self
.stats
.stale_miss(stale_misses
, subname
=stat_subname
)
842 values
= self
.realcache
.simple_get_multi(keys
)
844 self
.stalecache
.set_multi(values
, time
=self
.staleness
)
845 self
.localcache
.update(values
)
849 misses
= len(keys
- set(ret
.keys()))
850 hits
= len(ret
) - local_hits
851 self
.stats
.cache_hit(hits
, subname
=stat_subname
)
852 self
.stats
.cache_miss(misses
, subname
=stat_subname
)
856 def _getstale(self
, keys
):
857 # this is only in its own function to make tapping it for
859 return self
.stalecache
.simple_get_multi(keys
)
862 newcache
= self
.localcache
.__class
__()
863 self
.localcache
= newcache
864 self
.caches
= (newcache
,) + self
.caches
[1:]
865 if isinstance(self
.realcache
, CacheChain
):
866 assert isinstance(self
.realcache
.caches
[0], LocalCache
)
867 self
.realcache
.caches
= (newcache
,) + self
.realcache
.caches
[1:]
870 return '<%s %r>' % (self
.__class
__.__name
__,
871 (self
.localcache
, self
.stalecache
, self
.realcache
))
873 CL_ONE
= ConsistencyLevel
.ONE
874 CL_QUORUM
= ConsistencyLevel
.QUORUM
877 class Permacache(object):
878 """Cassandra key/value column family backend with a cachechain in front.
880 Probably best to not think of this as a cache but rather as a key/value
881 datastore that's faster to access than cassandra because of the cache.
885 COLUMN_NAME
= 'value'
887 def __init__(self
, cache_chain
, column_family
, lock_factory
):
888 self
.cache_chain
= cache_chain
889 self
.make_lock
= lock_factory
890 self
.cf
= column_family
893 def _setup_column_family(cls
, column_family_name
, client
):
894 cf
= ColumnFamily(client
, column_family_name
,
895 read_consistency_level
=CL_QUORUM
,
896 write_consistency_level
=CL_QUORUM
)
899 def _backend_get(self
, keys
):
900 keys
, is_single
= tup(keys
, ret_is_single
=True)
901 rows
= self
.cf
.multiget(keys
, columns
=[self
.COLUMN_NAME
])
903 key
: pickle
.loads(columns
[self
.COLUMN_NAME
])
904 for key
, columns
in rows
.iteritems()
908 return ret
.values()[0]
914 def _backend_set(self
, key
, val
):
916 ret
= self
._backend
_set
_multi
(keys
)
919 def _backend_set_multi(self
, keys
, prefix
=''):
921 with self
.cf
.batch():
922 for key
, val
in keys
.iteritems():
923 rowkey
= "%s%s" % (prefix
, key
)
924 column
= {self
.COLUMN_NAME
: pickle
.dumps(val
, protocol
=2)}
925 ret
[key
] = self
.cf
.insert(rowkey
, column
)
928 def _backend_delete(self
, key
):
931 def get(self
, key
, default
=None, allow_local
=True, stale
=False):
932 val
= self
.cache_chain
.get(
933 key
, default
=None, allow_local
=allow_local
, stale
=stale
)
936 val
= self
._backend
_get
(key
)
938 self
.cache_chain
.set(key
, val
)
941 def set(self
, key
, val
):
942 self
._backend
_set
(key
, val
)
943 self
.cache_chain
.set(key
, val
)
945 def set_multi(self
, keys
, prefix
='', time
=None):
946 # time is sent by sgm but will be ignored
947 self
._backend
_set
_multi
(keys
, prefix
=prefix
)
948 self
.cache_chain
.set_multi(keys
, prefix
=prefix
)
950 def pessimistically_set(self
, key
, value
):
952 Sets a value in Cassandra but instead of setting it in memcached,
953 deletes it from there instead. This is useful for the mr_top job which
954 sets thousands of keys but almost all of them will never be read out of
956 self
._backend
_set
(key
, value
)
957 self
.cache_chain
.delete(key
)
959 def get_multi(self
, keys
, prefix
='', allow_local
=True, stale
=False):
960 call_fn
= lambda k
: self
.simple_get_multi(k
, allow_local
=allow_local
,
962 return prefix_keys(keys
, prefix
, call_fn
)
964 def simple_get_multi(self
, keys
, allow_local
=True, stale
=False):
965 ret
= self
.cache_chain
.simple_get_multi(
966 keys
, allow_local
=allow_local
, stale
=stale
)
967 still_need
= {key
for key
in keys
if key
not in ret
}
969 from_cass
= self
._backend
_get
(keys
)
970 self
.cache_chain
.set_multi(from_cass
)
971 ret
.update(from_cass
)
974 def delete(self
, key
):
975 self
._backend
_delete
(key
)
976 self
.cache_chain
.delete(key
)
978 def mutate(self
, key
, mutation_fn
, default
=None, willread
=True):
979 """Mutate a Cassandra key as atomically as possible"""
980 with self
.make_lock("permacache_mutate", "mutate_%s" % key
):
981 # This has an edge-case where the cache chain was populated by a ONE
982 # read rather than a QUORUM one just before running this. All reads
983 # should use consistency level QUORUM.
985 value
= self
.cache_chain
.get(key
, allow_local
=False)
987 value
= self
._backend
_get
(key
)
991 # send in a copy in case they mutate it in-place
992 new_value
= mutation_fn(copy(value
))
994 if not willread
or value
!= new_value
:
995 self
._backend
_set
(key
, new_value
)
996 self
.cache_chain
.set(key
, new_value
, use_timer
=False)
1000 return '<%s %r %r>' % (self
.__class
__.__name
__,
1001 self
.cache_chain
, self
.cf
.column_family
)
1004 def test_cache(cache
, prefix
=''):
1006 cache
.set('%s1' % prefix
, 1)
1007 assert cache
.get('%s1' % prefix
) == 1
1010 cache
.set('%s2' % prefix
, [1,2,3])
1011 assert cache
.get('%s2' % prefix
) == [1,2,3]
1013 #set multi, no prefix
1014 cache
.set_multi({'%s3' % prefix
:3, '%s4' % prefix
: 4})
1015 assert cache
.get_multi(('%s3' % prefix
, '%s4' % prefix
)) == {'%s3' % prefix
: 3,
1019 cache
.set_multi({'3':3, '4': 4}, prefix
='%sp_' % prefix
)
1020 assert cache
.get_multi(('3', 4), prefix
='%sp_' % prefix
) == {'3':3, 4: 4}
1021 assert cache
.get_multi(('%sp_3' % prefix
, '%sp_4' % prefix
)) == {'%sp_3'%prefix
: 3,
1025 cache
.set('%s1'%prefix
, 1)
1026 assert cache
.get('%s1'%prefix
) == 1
1027 cache
.delete('%s1'%prefix
)
1028 assert cache
.get('%s1'%prefix
) is None
1030 cache
.set('%s1'%prefix
, 1)
1031 cache
.set('%s2'%prefix
, 2)
1032 cache
.set('%s3'%prefix
, 3)
1033 assert cache
.get('%s1'%prefix
) == 1 and cache
.get('%s2'%prefix
) == 2
1034 cache
.delete_multi(['%s1'%prefix
, '%s2'%prefix
])
1035 assert (cache
.get('%s1'%prefix
) is None
1036 and cache
.get('%s2'%prefix
) is None
1037 and cache
.get('%s3'%prefix
) == 3)
1040 cache
.set('%s5'%prefix
, 1)
1041 cache
.set('%s6'%prefix
, 1)
1042 cache
.incr('%s5'%prefix
)
1043 assert cache
.get('%s5'%prefix
) == 2
1044 cache
.incr('%s5'%prefix
,2)
1045 assert cache
.get('%s5'%prefix
) == 4
1046 cache
.incr_multi(('%s5'%prefix
, '%s6'%prefix
), 1)
1047 assert cache
.get('%s5'%prefix
) == 5
1048 assert cache
.get('%s6'%prefix
) == 2
1050 def test_multi(cache
):
1051 from threading
import Thread
1054 num_per_thread
= 1000
1057 for x
in range(num_threads
):
1060 for y
in range(num_per_thread
):
1061 test_cache(cache
,prefix
=prefix
)
1063 t
= Thread(target
=_fn(str(x
)))
1067 for thread
in threads
:
1070 # a cache that occasionally dumps itself to be used for long-running
1072 class SelfEmptyingCache(LocalCache
):
1073 def __init__(self
, max_size
=10*1000):
1074 self
.max_size
= max_size
1076 def maybe_reset(self
):
1077 if len(self
) > self
.max_size
:
1080 def set(self
, key
, val
, time
=0):
1082 return LocalCache
.set(self
,key
,val
,time
)
1084 def add(self
, key
, val
, time
=0):
1086 return LocalCache
.add(self
, key
, val
)
1089 def _make_hashable(s
):
1090 if isinstance(s
, str):
1092 elif isinstance(s
, unicode):
1093 return s
.encode('utf-8')
1094 elif isinstance(s
, (tuple, list)):
1095 return ','.join(_make_hashable(x
) for x
in s
)
1096 elif isinstance(s
, dict):
1097 return ','.join('%s:%s' % (_make_hashable(k
), _make_hashable(v
))
1098 for (k
, v
) in sorted(s
.iteritems()))
1103 def make_key_id(*a
, **kw
):
1105 h
.update(_make_hashable(a
))
1106 h
.update(_make_hashable(kw
))
1107 return h
.hexdigest()
1109 def make_key(iden
, *a
, **kw
):
1111 A helper function for making memcached-usable cache keys out of
1112 arbitrary arguments. Hashes the arguments but leaves the `iden'
1116 iden
= _make_hashable(iden
)
1118 h
.update(_make_hashable(a
))
1119 h
.update(_make_hashable(kw
))
1121 return '%s(%s)' % (iden
, h
.hexdigest())
1125 from pylons
import app_globals
as g
1127 assert isinstance(ca
, StaleCacheChain
)
1129 ca
.localcache
.clear()
1131 ca
.stalecache
.set('foo', 'bar', time
=ca
.staleness
)
1132 assert ca
.stalecache
.get('foo') == 'bar'
1133 ca
.realcache
.set('foo', 'baz')
1134 assert ca
.realcache
.get('foo') == 'baz'
1136 assert ca
.get('foo', stale
=True) == 'bar'
1137 ca
.localcache
.clear()
1138 assert ca
.get('foo', stale
=False) == 'baz'
1139 ca
.localcache
.clear()
1141 assert ca
.get_multi(['foo'], stale
=True) == {'foo': 'bar'}
1142 assert len(ca
.localcache
) == 0
1143 assert ca
.get_multi(['foo'], stale
=False) == {'foo': 'baz'}
1144 ca
.localcache
.clear()