Commit b5653fec by Őry Máté

Merge branch 'feature-prefetch-cache' into 'master'

Feature: prefetch expiring cached methods values
parents 026be873 7d81b1b9
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
from collections import deque from collections import deque
from contextlib import contextmanager from contextlib import contextmanager
from functools import update_wrapper
from hashlib import sha224 from hashlib import sha224
from itertools import chain, imap from itertools import chain, imap
from logging import getLogger from logging import getLogger
...@@ -36,6 +37,7 @@ from django.utils.functional import Promise ...@@ -36,6 +37,7 @@ from django.utils.functional import Promise
from django.utils.translation import ugettext_lazy as _, ugettext_noop from django.utils.translation import ugettext_lazy as _, ugettext_noop
from jsonfield import JSONField from jsonfield import JSONField
from manager.mancelery import celery
from model_utils.models import TimeStampedModel from model_utils.models import TimeStampedModel
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -212,6 +214,38 @@ class ActivityModel(TimeStampedModel): ...@@ -212,6 +214,38 @@ class ActivityModel(TimeStampedModel):
self.result_data = None if value is None else value.to_dict() self.result_data = None if value is None else value.to_dict()
@celery.task()
def compute_cached(method, instance, memcached_seconds,
key, start, *args, **kwargs):
"""Compute and store actual value of cached method."""
if isinstance(method, basestring):
model, id = instance
instance = model.objects.get(id=id)
try:
method = getattr(model, method)
while hasattr(method, '_original') or hasattr(method, 'fget'):
try:
method = method._original
except AttributeError:
method = method.fget
except AttributeError:
logger.exception("Couldnt get original method of %s",
unicode(method))
raise
# call the actual method
result = method(instance, *args, **kwargs)
# save to memcache
cache.set(key, result, memcached_seconds)
elapsed = time() - start
cache.set("%s.cached" % key, 2, max(memcached_seconds * 0.5,
memcached_seconds * 0.75 - elapsed))
logger.debug('Value of <%s>.%s(%s)=<%s> saved to cache (%s elapsed).',
unicode(instance), method.__name__, unicode(args),
unicode(result), elapsed)
return result
def method_cache(memcached_seconds=60, instance_seconds=5): # noqa def method_cache(memcached_seconds=60, instance_seconds=5): # noqa
"""Cache return value of decorated method to memcached and memory. """Cache return value of decorated method to memcached and memory.
...@@ -233,9 +267,11 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa ...@@ -233,9 +267,11 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa
def inner_cache(method): def inner_cache(method):
method_name = method.__name__
def get_key(instance, *args, **kwargs): def get_key(instance, *args, **kwargs):
return sha224(unicode(method.__module__) + return sha224(unicode(method.__module__) +
unicode(method.__name__) + method_name +
unicode(instance.id) + unicode(instance.id) +
unicode(args) + unicode(args) +
unicode(kwargs)).hexdigest() unicode(kwargs)).hexdigest()
...@@ -254,21 +290,31 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa ...@@ -254,21 +290,31 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa
if vals['time'] + instance_seconds > now: if vals['time'] + instance_seconds > now:
# has valid on class cache, return that # has valid on class cache, return that
result = vals['value'] result = vals['value']
setattr(instance, key, {'time': now, 'value': result})
if result is None: if result is None:
result = cache.get(key) result = cache.get(key)
if invalidate or (result is None): if invalidate or (result is None):
# all caches failed, call the actual method logger.debug("all caches failed, compute now")
result = method(instance, *args, **kwargs) result = compute_cached(method, instance, memcached_seconds,
# save to memcache and class attr key, time(), *args, **kwargs)
cache.set(key, result, memcached_seconds)
setattr(instance, key, {'time': now, 'value': result}) setattr(instance, key, {'time': now, 'value': result})
logger.debug('Value of <%s>.%s(%s)=<%s> saved to cache.', elif not cache.get("%s.cached" % key):
unicode(instance), method.__name__, logger.debug("caches expiring, compute async")
unicode(args), unicode(result)) cache.set("%s.cached" % key, 1, memcached_seconds * 0.5)
try:
compute_cached.apply_async(
queue='localhost.man', kwargs=kwargs, args=[
method_name, (instance.__class__, instance.id),
memcached_seconds, key, time()] + list(args))
except:
logger.exception("Couldnt compute async %s", method_name)
return result return result
update_wrapper(x, method)
x._original = method
return x return x
return inner_cache return inner_cache
......
...@@ -28,7 +28,7 @@ def check_queue(storage, queue_id, priority): ...@@ -28,7 +28,7 @@ def check_queue(storage, queue_id, priority):
if priority is not None: if priority is not None:
queue_name = queue_name + "." + priority queue_name = queue_name + "." + priority
inspect = celery.control.inspect() inspect = celery.control.inspect()
inspect.timeout = 0.1 inspect.timeout = 0.5
active_queues = inspect.active_queues() active_queues = inspect.active_queues()
if active_queues is None: if active_queues is None:
return False return False
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from functools import update_wrapper
from logging import getLogger from logging import getLogger
from warnings import warn from warnings import warn
import requests import requests
...@@ -51,6 +52,8 @@ def node_available(function): ...@@ -51,6 +52,8 @@ def node_available(function):
return function(self, *args, **kwargs) return function(self, *args, **kwargs)
else: else:
return None return None
update_wrapper(decorate, function)
decorate._original = function
return decorate return decorate
......
...@@ -55,7 +55,7 @@ def get_queues(): ...@@ -55,7 +55,7 @@ def get_queues():
result = cache.get(key) result = cache.get(key)
if result is None: if result is None:
inspect = celery.control.inspect() inspect = celery.control.inspect()
inspect.timeout = 0.1 inspect.timeout = 0.5
result = inspect.active_queues() result = inspect.active_queues()
logger.debug('Queue list of length %d cached.', len(result)) logger.debug('Queue list of length %d cached.', len(result))
cache.set(key, result, 10) cache.set(key, result, 10)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment