Commit bf71e2b0 by IK

Füred bővítés: json, picle_v2

parent 38777397
#
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_RESULT_EXPIRES = 300
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_ACCEPT_CONTENT = ['json', 'pickle_v2', 'pickle']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'pickle_v2'
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True
...@@ -24,6 +24,7 @@ from logging import getLogger ...@@ -24,6 +24,7 @@ from logging import getLogger
from time import time from time import time
from warnings import warn from warnings import warn
from django.apps import apps
from django.contrib import messages from django.contrib import messages
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.cache import cache from django.core.cache import cache
...@@ -245,9 +246,51 @@ class ActivityModel(TimeStampedModel): ...@@ -245,9 +246,51 @@ class ActivityModel(TimeStampedModel):
def compute_cached(method, instance, memcached_seconds, def compute_cached(method, instance, memcached_seconds,
key, start, *args, **kwargs): key, start, *args, **kwargs):
"""Compute and store actual value of cached method.""" """Compute and store actual value of cached method."""
def _resolve_instance(instance_ref):
"""
Accpted forms:
- (ModelClass, pk) # legacy (pickle)
- ("app_label.ModelName", pk) # JSON-safe tuple
- {"model": "app_label.ModelName", "pk": pk} # JSON-safe dict
- {"model": "...", "id": pk} # 'id' key
"""
# dict forma
if isinstance(instance_ref, dict):
model_label = instance_ref.get('model')
pk = instance_ref.get('pk', instance_ref.get('id'))
if not model_label or pk is None:
raise TypeError("Bad instance_ref dict: %r" % (instance_ref,))
Model = apps.get_model(model_label)
if Model is None:
raise LookupError("Unknown model label: %r" % (model_label,))
return Model.objects.get(pk=pk)
# tuple/list forma
if isinstance(instance_ref, (tuple, list)) and len(instance_ref) == 2:
model_ref, pk = instance_ref
# legacy: ModelClass
try:
from django.db.models.base import ModelBase
if isinstance(model_ref, ModelBase):
Model = model_ref
else:
# "app_label.ModelName"
Model = apps.get_model(model_ref)
except Exception:
# if no ModelBase or the model_ref in not a class
Model = apps.get_model(model_ref)
if Model is None:
raise LookupError("Unknown model reference: %r" % (model_ref,))
return Model.objects.get(pk=pk)
raise TypeError("Unsupported instance_ref: %r" % (instance_ref,))
if isinstance(method, basestring): if isinstance(method, basestring):
model, id = instance # instance from any of the accepted forms
instance = model.objects.get(id=id) _inst = _resolve_instance(instance)
# the model came from the instance
model = _inst.__class__
instance = _inst
try: try:
method = getattr(model, method) method = getattr(model, method)
while hasattr(method, '_original') or hasattr(method, 'fget'): while hasattr(method, '_original') or hasattr(method, 'fget'):
...@@ -331,10 +374,12 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa ...@@ -331,10 +374,12 @@ def method_cache(memcached_seconds=60, instance_seconds=5): # noqa
logger.debug("caches expiring, compute async") logger.debug("caches expiring, compute async")
cache.set("%s.cached" % key, 1, memcached_seconds * 0.5) cache.set("%s.cached" % key, 1, memcached_seconds * 0.5)
try: try:
instance_ref = (instance._meta.label, instance.id) # biztosan tuple
compute_cached.apply_async( compute_cached.apply_async(
queue='localhost.man', kwargs=kwargs, args=[ queue='localhost.man', kwargs=kwargs, args=[
method_name, (instance.__class__, instance.id), method_name, instance_ref,
memcached_seconds, key, time()] + list(args)) memcached_seconds, key, time()] + list(args))
# method_name, (instance.__class__, instance.id),
except: except:
logger.exception("Couldnt compute async %s", method_name) logger.exception("Couldnt compute async %s", method_name)
......
...@@ -125,6 +125,129 @@ class Operation(object): ...@@ -125,6 +125,129 @@ class Operation(object):
""" """
raise NotImplementedError raise NotImplementedError
@staticmethod
def __find_not_jsonable(obj, path="$", seen=None):
# --- normalize path to string (handles tuple too) ---
if not isinstance(path, basestring):
try:
path = unicode(path)
except Exception:
path = str(path)
if seen is None:
seen = set()
import types
from decimal import Decimal
from datetime import datetime, date
try:
from django.db import models
from django.db.models.query import QuerySet
from django.utils.functional import Promise
except Exception:
models = QuerySet = Promise = None
if obj is None or isinstance(obj, (bool, int, long, float, basestring)):
return None
oid = id(obj)
if oid in seen:
return u"%s (seen)" % (path,)
seen.add(oid)
if models and isinstance(obj, models.Model):
return u"%s (Django Model)" % (path,)
if QuerySet and isinstance(obj, QuerySet):
return u"%s (QuerySet)" % (path,)
if isinstance(obj, (datetime, date)):
return u"%s (datetime/date)" % (path,)
if isinstance(obj, Decimal):
return u"%s (Decimal)" % (path,)
if Promise and isinstance(obj, Promise):
return u"%s (Promise/lazy)" % (path,)
if isinstance(obj, set):
return u"%s (set)" % (path,)
if hasattr(obj, "__call__"):
return u"%s (callable)" % (path,)
if isinstance(obj, dict):
bad_s = u""
bad_b = None
for k, v in obj.iteritems():
kstr = k if isinstance(k, basestring) else unicode(k)
bad = Operation.__find_not_jsonable(v, u"%s[%r]" % (path, kstr), seen)
if bad:
if bad_b:
bad_s = u"%s, %s" % (bad_s, bad)
else:
bad_s = bad
bad_b = True
if bad_b:
return bad_s
return None
if isinstance(obj, (list, tuple)):
bad_s = u""
bad_b = None
for i, v in enumerate(obj):
bad = Operation.__find_not_jsonable(v, u"%s[%d]" % (path, i), seen)
if bad:
if bad_b:
bad_s = u"%s, %s" % (bad_s, bad)
else:
bad_s = bad
bad_b = True
if bad_b:
return bad_s
return None
return u"%s (%s)" % (path, obj.__class__.__name__)
def __to_jsonable(x, seen=None):
if seen is None: seen = set()
oid = id(x)
if oid in seen:
return '<cycle>'
seen.add(oid)
import types
from decimal import Decimal
from datetime import datetime, date
try:
from django.db import models
from django.db.models.query import QuerySet
from django.utils.functional import Promise
except Exception:
models = QuerySet = Promise = None
if x is None or isinstance(x, (bool, int, long, float, basestring)):
return x
if models and isinstance(x, models.Model):
return getattr(x, 'pk', None)
if QuerySet and isinstance(x, QuerySet):
return list(x.values_list('pk', flat=True))
if isinstance(x, (datetime, date)):
return x.isoformat()
if isinstance(x, Decimal):
return str(x)
if isinstance(x, set):
return [to_jsonable(v, seen) for v in x]
if Promise and isinstance(x, Promise):
return unicode(x) # lazy evaluation
if isinstance(x, dict):
out = {}
for k, v in x.iteritems():
k = k if isinstance(k, basestring) else unicode(k)
out[k] = to_jsonable(v, seen)
return out
if isinstance(x, (list, tuple)):
return [to_jsonable(v, seen) for v in x]
if hasattr(x, "__call__"):
return getattr(x, "__name__", repr(x))
return repr(x)
def async(self, **kwargs): def async(self, **kwargs):
"""Execute the operation asynchronously. """Execute the operation asynchronously.
...@@ -135,13 +258,34 @@ class Operation(object): ...@@ -135,13 +258,34 @@ class Operation(object):
For more information, check the synchronous call's documentation. For more information, check the synchronous call's documentation.
""" """
from common.transport import to_wire
logger.info("%s called asynchronously on %s with the following " logger.info("%s called asynchronously on %s with the following "
"parameters: %r", self.__class__.__name__, self.subject, "queue: %r parameters: %r", self.__class__.__name__, self.subject, self.async_queue,
kwargs) kwargs)
activity, allargs, auxargs = self.__prelude(kwargs) activity, allargs, auxargs = self.__prelude(kwargs)
return self.async_operation.apply_async( bad = self.__find_not_jsonable((allargs, auxargs))
args=(self.id, self.subject.pk, activity.pk, allargs, auxargs, ), if bad:
queue=self.async_queue) logger.info("JSON-inkompatibility element: %s", bad)
sid = getattr(self, 'subject_id', None) or self.subject.pk
aid = getattr(activity, 'id', None) or activity.pk
allargs_wire = to_wire(allargs)
auxargs_wire = to_wire(auxargs)
logger.info("allargs_wire keys: %r", sorted(allargs_wire.keys()))
logger.info("auxargs_wire keys: %r", sorted(auxargs_wire.keys()))
import json
preview = json.dumps(allargs_wire, default=str, ensure_ascii=False)[:500]
logger.info("allargs JSON preview: %s", preview)
async_ret = self.async_operation.apply_async(
args=(self.id, sid, aid, allargs_wire, auxargs_wire),
queue=self.async_queue,
)
logger.info("async_ret %r", async_ret)
return async_ret
def call(self, **kwargs): def call(self, **kwargs):
"""Execute the operation (synchronously). """Execute the operation (synchronously).
......
# serializers.py
from kombu.serialization import register
import pickle
def pickle_v2_dumps(obj):
return pickle.dumps(obj, protocol=2)
def pickle_v2_loads(s):
return pickle.loads(s)
register(
'pickle_v2',
pickle_v2_dumps,
pickle_v2_loads,
content_type='application/x-pickle',
content_encoding='binary',
)
# -*- coding: utf-8 -*-
# circle/common/transport.py
from django.db import models
from django.db.models.query import QuerySet
from django.utils.functional import SimpleLazyObject, Promise
from datetime import datetime, date
from decimal import Decimal
from django.apps import apps
# Global keys
GLOBAL_MODEL_FIELDS = {
'user': 'auth.User',
'node': 'vm.Node',
'to_node': 'vm.Node',
'instance': 'vm.Instance',
'activity': 'vm.InstanceActivity',
'vlan': 'firewall.Vlan',
}
MODEL_TAG_KEY = '__model__'
def _merge_fields(extra):
"""extra: dict or None. Returna an usable mapping (global + override)."""
mf = GLOBAL_MODEL_FIELDS.copy()
if extra:
mf.update(extra)
return mf
def _model_label(obj):
# "app_label.ModelName"
return obj._meta.app_label + '.' + obj.__class__.__name__
def to_wire(data, model_fields=None):
"""Recursive JSON-friendly encoder
- model_fields: opcional dict {key -> 'app_label.ModelName'} – per-message
- known Django model -> pk
- unknown Django model -> {__model__: "app.Model", pk: <id>}
- QuerySet -> [pk, ...]
- datetime/date -> ISO
- Decimal -> str
- set -> list
- keys -> always strings (JSON)
"""
mf = _merge_fields(model_fields)
if data is None or isinstance(data, (bool, int, long, float, basestring)):
return data
if isinstance(data, models.Model):
return {MODEL_TAG_KEY: _model_label(data), 'pk': data.pk}
if isinstance(data, SimpleLazyObject):
# example: request.user –
try:
return to_wire(data._wrapped, model_fields=mf)
except Exception:
return to_wire(data.__dict__, model_fields=mf)
if isinstance(data, QuerySet):
return list(data.values_list('pk', flat=True))
if isinstance(data, (datetime, date)):
return data.isoformat()
if isinstance(data, Decimal):
return str(data)
if isinstance(data, set):
return [to_wire(v, model_fields=mf) for v in data]
if isinstance(data, dict):
out = {}
for k, v in data.iteritems():
if not isinstance(k, basestring):
k = unicode(k)
# If key found in the mapping -> pk
if k in mf and isinstance(v, models.Model):
out[k] = getattr(v, 'pk', None)
else:
out[k] = to_wire(v, model_fields=mf)
return out
if isinstance(data, (list, tuple)):
return [to_wire(v, model_fields=mf) for v in data]
if isinstance(data, Promise):
return unicode(data)
if hasattr(data, '__call__'):
# bound method/function name – JSON-friendly
return getattr(data, '__name__', unicode(data))
# fallback: reprezentation
return unicode(data)
def from_wire(data, model_fields=None):
"""Recursive JSON-friendly decoder
- Django model objects are loaded form Django DB using the MODEL_FIELDS
"""
mf = _merge_fields(model_fields)
# tagged model
if isinstance(data, dict) and MODEL_TAG_KEY in data and 'pk' in data:
label = data[MODEL_TAG_KEY]
pk = data['pk']
Model = apps.get_model(label)
return Model.objects.get(pk=pk)
if isinstance(data, dict):
out = {}
for k, v in data.iteritems():
if k in mf and v is not None:
Model = apps.get_model(mf[k])
out[k] = Model.objects.get(pk=v)
else:
out[k] = from_wire(v, model_fields=mf)
return out
if isinstance(data, (list, tuple)):
return [from_wire(v, model_fields=mf) for v in data]
# primitives
return data
...@@ -32,6 +32,8 @@ ...@@ -32,6 +32,8 @@
{# <img src="//chart.googleapis.com/chart?chs=255x255&chld=L|0&cht=qr&chl={{ uri }}"/> #} {# <img src="//chart.googleapis.com/chart?chs=255x255&chld=L|0&cht=qr&chl={{ uri }}"/> #}
<img src="{{ uri | qrcode_src }}" with="300"i height=300 alt="{{ uri }}"> <img src="{{ uri | qrcode_src }}" with="300"i height=300 alt="{{ uri }}">
{# <small><a href="{{ uri }}">{{ uri }}</a></small> #} {# <small><a href="{{ uri }}">{{ uri }}</a></small> #}
<img src="//quickchart.io/chart?chs=255x255&chld=L|0&cht=qr&chl={{ uri }}"/>
<small><a href="{{ uri }}">{{ uri }}</a></small>
</div> </div>
<hr /> <hr />
<div id="two-factor-confirm"> <div id="two-factor-confirm">
......
...@@ -241,13 +241,15 @@ class Rule(models.Model): ...@@ -241,13 +241,15 @@ class Rule(models.Model):
for foreign_vlan in self.foreign_network.vlans.all(): for foreign_vlan in self.foreign_network.vlans.all():
if not foreign_vlan.managed: if not foreign_vlan.managed:
continue continue
try:
r = IptRule(priority=self.weight, action=action, r = IptRule(priority=self.weight, action=action,
proto=self.proto, extra=self.extra, proto=self.proto, extra=self.extra,
comment='Rule #%s' % self.pk, comment='Rule #%s' % self.pk,
src=src, dst=dst, dport=self.dport, sport=self.sport) src=src, dst=dst, dport=self.dport, sport=self.sport)
chain_name = self.get_chain_name(local=vlan, remote=foreign_vlan) chain_name = self.get_chain_name(local=vlan, remote=foreign_vlan)
retval[chain_name] = r retval[chain_name] = r
except:
logger.error("Error in rule #%s", self.pk)
return retval return retval
......
...@@ -36,17 +36,25 @@ def _apply_once(name, tasks, queues, task, data): ...@@ -36,17 +36,25 @@ def _apply_once(name, tasks, queues, task, data):
if name not in tasks: if name not in tasks:
return return
data = data() logger.info("_apply_once...%s %s", name, queues)
data = data() # for testing the secondary fw
for queue in queues: for queue in queues:
try: try:
task.apply_async(args=data, queue=queue, expires=60).get(timeout=5) task.apply_async(args=data, queue=queue, expires=60).get(timeout=5)
logger.info("_apply...%s %s", name, queue)
# data = data()
logger.info("%s configuration is reloaded. (queue: %s)", logger.info("%s configuration is reloaded. (queue: %s)",
name, queue) name, queue)
# break
except TimeoutError as e: except TimeoutError as e:
logger.critical('%s (queue: %s, task: %s)', e, queue, name) logger.critical('%s (queue: %s, task: %s)', e, queue, name)
except: except:
logger.critical('Unhandled exception: queue: %s data: %s task: %s', logger.critical('Unhandled exception: queue: %s data: %s task: %s',
queue, data, name, exc_info=True) queue, data, name, exc_info=True)
logger.info('Unhandled exception: queue: %s data: %s task: %s',
queue, data, name, exc_info=True)
def get_firewall_queues(): def get_firewall_queues():
...@@ -95,6 +103,7 @@ def reloadtask_worker(): ...@@ -95,6 +103,7 @@ def reloadtask_worker():
lambda: (vlan(), )) lambda: (vlan(), ))
_apply_once('blacklist', tasks, firewall_queues, reload_blacklist, _apply_once('blacklist', tasks, firewall_queues, reload_blacklist,
lambda: (list(ipset()), )) lambda: (list(ipset()), ))
logger.info("reloadtask_worker: Reloaded %s", ", ".join(tasks))
@celery.task @celery.task
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery from celery import Celery
import common.serializers
from celery.signals import worker_ready from celery.signals import worker_ready
from celery.schedules import crontab from celery.schedules import crontab
from datetime import timedelta from datetime import timedelta
...@@ -52,9 +53,9 @@ celery = Celery('manager', ...@@ -52,9 +53,9 @@ celery = Celery('manager',
'dashboard.tasks.local_periodic_tasks', 'dashboard.tasks.local_periodic_tasks',
]) ])
celery.config_from_object('celeryconfig')
celery.conf.update( celery.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME + '.man', Exchange('manager', type='direct'), Queue(HOSTNAME + '.man', Exchange('manager', type='direct'),
routing_key="manager"), routing_key="manager"),
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery from celery import Celery
import common.serializers
from celery.signals import worker_ready from celery.signals import worker_ready
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange from kombu import Queue, Exchange
...@@ -30,9 +31,9 @@ celery = Celery('monitor', ...@@ -30,9 +31,9 @@ celery = Celery('monitor',
'monitor.tasks.local_periodic_tasks', 'monitor.tasks.local_periodic_tasks',
]) ])
celery.config_from_object('celeryconfig')
celery.conf.update( celery.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(QUEUE_NAME, Exchange('monitor', type='direct'), Queue(QUEUE_NAME, Exchange('monitor', type='direct'),
routing_key="monitor"), routing_key="monitor"),
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery from celery import Celery
import common.serializers
from celery.signals import worker_ready from celery.signals import worker_ready
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange from kombu import Queue, Exchange
...@@ -32,9 +33,10 @@ celery = Celery('manager.slow', ...@@ -32,9 +33,10 @@ celery = Celery('manager.slow',
'storage.tasks.periodic_tasks', 'storage.tasks.periodic_tasks',
]) ])
celery.config_from_object('celeryconfig')
celery.conf.update( celery.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(QUEUE_NAME, Exchange('manager.slow', type='direct'), Queue(QUEUE_NAME, Exchange('manager.slow', type='direct'),
routing_key="manager.slow"), routing_key="manager.slow"),
......
...@@ -65,6 +65,7 @@ class RemoteOperationMixin(object): ...@@ -65,6 +65,7 @@ class RemoteOperationMixin(object):
def _operation(self, **kwargs): def _operation(self, **kwargs):
args = self._get_remote_args(**kwargs) args = self._get_remote_args(**kwargs)
logger.info("RemoteOpMixin %s %s %r", self.id, self._get_remote_queue(), args);
return self.task.apply_async( return self.task.apply_async(
args=args, queue=self._get_remote_queue() args=args, queue=self._get_remote_queue()
).get(timeout=self.remote_timeout) ).get(timeout=self.remote_timeout)
......
...@@ -17,16 +17,20 @@ ...@@ -17,16 +17,20 @@
from celery.contrib.abortable import AbortableTask from celery.contrib.abortable import AbortableTask
from manager.mancelery import celery from manager.mancelery import celery
from common.transport import from_wire
@celery.task(base=AbortableTask, bind=True) @celery.task(base=AbortableTask, bind=True)
def abortable_async_instance_operation(task, operation_id, instance_pk, def abortable_async_instance_operation(task, operation_id, instance_pk,
activity_pk, allargs, auxargs): activity_pk, allargs_wire, auxargs_wire):
from vm.models import Instance, InstanceActivity from vm.models import Instance, InstanceActivity
instance = Instance.objects.get(pk=instance_pk) instance = Instance.objects.get(pk=instance_pk)
operation = getattr(instance, operation_id) operation = getattr(instance, operation_id)
activity = InstanceActivity.objects.get(pk=activity_pk) activity = InstanceActivity.objects.get(pk=activity_pk)
allargs = from_wire(allargs_wire)
auxargs = from_wire(auxargs_wire)
# save async task UUID to activity # save async task UUID to activity
activity.task_uuid = task.request.id activity.task_uuid = task.request.id
activity.save() activity.save()
...@@ -39,12 +43,15 @@ def abortable_async_instance_operation(task, operation_id, instance_pk, ...@@ -39,12 +43,15 @@ def abortable_async_instance_operation(task, operation_id, instance_pk,
@celery.task(base=AbortableTask, bind=True) @celery.task(base=AbortableTask, bind=True)
def abortable_async_node_operation(task, operation_id, node_pk, activity_pk, def abortable_async_node_operation(task, operation_id, node_pk, activity_pk,
allargs, auxargs): allargs_wire, auxargs_wire):
from vm.models import Node, NodeActivity from vm.models import Node, NodeActivity
node = Node.objects.get(pk=node_pk) node = Node.objects.get(pk=node_pk)
operation = getattr(node, operation_id) operation = getattr(node, operation_id)
activity = NodeActivity.objects.get(pk=activity_pk) activity = NodeActivity.objects.get(pk=activity_pk)
allargs = from_wire(allargs_wire)
auxargs = from_wire(auxargs_wire)
# save async task UUID to activity # save async task UUID to activity
activity.task_uuid = task.request.id activity.task_uuid = task.request.id
activity.save() activity.save()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment