Commit 3911c24d by Dudás Ádám

Merge branch 'issue-69'

Conflicts:
	circle/dashboard/views.py
parents 52795118 de7cb13e
from collections import deque
from contextlib import contextmanager
from hashlib import sha224
from logging import getLogger
from time import time
......@@ -32,6 +33,18 @@ def activitycontextimpl(act, on_abort=None, on_commit=None):
act.finish(succeeded=True, event_handler=on_commit)
activity_context = contextmanager(activitycontextimpl)
activity_code_separator = '.'
def join_activity_code(*args):
"""Join the specified parts into an activity code.
"""
return activity_code_separator.join(args)
class ActivityModel(TimeStampedModel):
activity_code = CharField(max_length=100, verbose_name=_('activity code'))
parent = ForeignKey('self', blank=True, null=True, related_name='children')
......
from logging import getLogger
from .models import activity_context
from django.core.exceptions import PermissionDenied
logger = getLogger(__name__)
class Operation(object):
"""Base class for VM operations.
"""
async_queue = 'localhost.man'
required_perms = ()
def __call__(self, **kwargs):
return self.call(**kwargs)
def __init__(self, subject):
"""Initialize a new operation bound to the specified subject.
"""
self.subject = subject
def __unicode__(self):
return self.name
def __prelude(self, kwargs):
"""This method contains the shared prelude of call and async.
"""
skip_checks = kwargs.setdefault('system', False)
user = kwargs.setdefault('user', None)
parent_activity = kwargs.pop('parent_activity', None)
if not skip_checks:
self.check_auth(user)
self.check_precond()
return self.create_activity(parent=parent_activity, user=user)
def _exec_op(self, activity, user, **kwargs):
"""Execute the operation inside the specified activity's context.
"""
with activity_context(activity, on_abort=self.on_abort,
on_commit=self.on_commit):
return self._operation(activity, user, **kwargs)
def _operation(self, activity, user, system, **kwargs):
"""This method is the operation's particular implementation.
Deriving classes should implement this method.
"""
raise NotImplementedError
def async(self, **kwargs):
"""Execute the operation asynchronously.
Only a quick, preliminary check is ran before creating the associated
activity and queuing the job.
The returned value is the handle for the asynchronous job.
For more information, check the synchronous call's documentation.
"""
logger.info("%s called asynchronously with the following parameters: "
"%r", self.__class__.__name__, kwargs)
activity = self.__prelude(kwargs)
return self.async_operation.apply_async(args=(self.id,
self.subject.pk,
activity.pk),
kwargs=kwargs,
queue=self.async_queue)
def call(self, **kwargs):
"""Execute the operation (synchronously).
Anticipated keyword arguments:
* parent_activity: Parent activity for the operation. If this argument
is present, the operation's activity will be created
as a child activity of it.
* system: Indicates that the operation is invoked by the system, not a
User. If this argument is present and has a value of True,
then authorization checks are skipped.
* user: The User invoking the operation. If this argument is not
present, it'll be provided with a default value of None.
"""
logger.info("%s called (synchronously) with the following parameters: "
"%r", self.__class__.__name__, kwargs)
activity = self.__prelude(kwargs)
return self._exec_op(activity=activity, **kwargs)
def check_precond(self):
pass
def check_auth(self, user):
if not user.has_perms(self.required_perms):
raise PermissionDenied("%s doesn't have the required permissions."
% user)
def create_activity(self, parent, user):
raise NotImplementedError
def on_abort(self, activity, error):
"""This method is called when the operation aborts (i.e. raises an
exception).
"""
pass
def on_commit(self, activity):
"""This method is called when the operation executes successfully.
"""
pass
operation_registry_name = '_ops'
class OperatedMixin(object):
def __getattr__(self, name):
# NOTE: __getattr__ is only called if the attribute doesn't already
# exist in your __dict__
cls = self.__class__
ops = getattr(cls, operation_registry_name, {})
op = ops.get(name)
if op:
return op(self)
else:
raise AttributeError("%r object has no attribute %r" %
(self.__class__.__name__, name))
def register_operation(target_cls, op_cls, op_id=None):
"""Register the specified operation with the target class.
You can optionally specify an ID to be used for the registration;
otherwise, the operation class' 'id' attribute will be used.
"""
if op_id is None:
op_id = op_cls.id
if not issubclass(target_cls, OperatedMixin):
raise TypeError("%r is not a subclass of %r" %
(target_cls.__name__, OperatedMixin.__name__))
if not hasattr(target_cls, operation_registry_name):
setattr(target_cls, operation_registry_name, dict())
getattr(target_cls, operation_registry_name)[op_id] = op_cls
from mock import MagicMock, patch
from django.test import TestCase
from ..operations import Operation
class OperationTestCase(TestCase):
def test_activity_created_before_async_job(self):
class AbortEx(Exception):
pass
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
op.async_operation = MagicMock(
apply_async=MagicMock(side_effect=AbortEx))
with patch.object(Operation, 'check_precond'):
with patch.object(Operation, 'create_activity') as create_act:
try:
op.async(system=True)
except AbortEx:
self.assertTrue(create_act.called)
def test_check_precond_called_before_create_activity(self):
class AbortEx(Exception):
pass
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
with patch.object(Operation, 'create_activity', side_effect=AbortEx):
with patch.object(Operation, 'check_precond') as chk_pre:
try:
op.call(system=True)
except AbortEx:
self.assertTrue(chk_pre.called)
def test_auth_check_on_non_system_call(self):
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
user = MagicMock()
with patch.object(Operation, 'check_auth') as check_auth:
with patch.object(Operation, 'check_precond'), \
patch.object(Operation, 'create_activity'), \
patch.object(Operation, '_exec_op'):
op.call(user=user)
check_auth.assert_called_with(user)
def test_no_auth_check_on_system_call(self):
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
with patch.object(Operation, 'check_auth', side_effect=AssertionError):
with patch.object(Operation, 'check_precond'), \
patch.object(Operation, 'create_activity'), \
patch.object(Operation, '_exec_op'):
op.call(system=True)
......@@ -6,6 +6,7 @@ from django.core.urlresolvers import reverse
from django.contrib.auth.models import Permission
from vm.models import Instance, InstanceTemplate, Lease, Node
from vm.operations import WakeUpOperation
from ..models import Profile
from ..views import VmRenewView
from storage.models import Disk
......@@ -487,7 +488,7 @@ class VmDetailTest(LoginMixin, TestCase):
def test_permitted_wake_up_wrong_state(self):
c = Client()
self.login(c, "user2")
with patch.object(Instance, 'wake_up_async') as mock_method:
with patch.object(WakeUpOperation, 'async') as mock_method:
inst = Instance.objects.get(pk=1)
mock_method.side_effect = inst.wake_up
inst.manual_state_change('RUNNING')
......@@ -501,7 +502,7 @@ class VmDetailTest(LoginMixin, TestCase):
c = Client()
self.login(c, "user2")
with patch.object(Instance, 'select_node', return_value=None):
with patch.object(Instance, 'wake_up_async') as new_wake_up:
with patch.object(WakeUpOperation, 'async') as new_wake_up:
with patch('vm.tasks.vm_tasks.wake_up.apply_async') as wuaa:
inst = Instance.objects.get(pk=1)
new_wake_up.side_effect = inst.wake_up
......
......@@ -423,7 +423,7 @@ class VmDetailView(CheckedDetailView):
new_name = "Saved from %s (#%d) at %s" % (
self.object.name, self.object.pk, date
)
self.object.save_as_template_async(name=new_name,
self.object.save_as_template.async(name=new_name,
user=request.user)
messages.success(request, _("Saving instance as template!"))
return redirect("%s#activity" % self.object.get_absolute_url())
......@@ -433,7 +433,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.shutdown_async(request.user)
self.object.shutdown.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
def __sleep(self, request):
......@@ -441,7 +441,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.sleep_async(request.user)
self.object.sleep.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
def __wake_up(self, request):
......@@ -449,7 +449,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.wake_up_async(request.user)
self.object.wake_up.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
def __deploy(self, request):
......@@ -457,7 +457,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.deploy_async(request.user)
self.object.deploy.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
def __reset(self, request):
......@@ -465,7 +465,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.reset_async(request.user)
self.object.reset.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
def __reboot(self, request):
......@@ -473,7 +473,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.reboot_async(request.user)
self.object.reboot.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
def __shut_off(self, request):
......@@ -481,7 +481,7 @@ class VmDetailView(CheckedDetailView):
if not self.object.has_level(request.user, 'owner'):
raise PermissionDenied()
self.object.shut_off_async(request.user)
self.object.shut_off.async(user=request.user)
return redirect("%s#activity" % self.object.get_absolute_url())
......@@ -1148,7 +1148,7 @@ class VmCreate(LoginRequiredMixin, TemplateView):
def __deploy(self, request, instances, *args, **kwargs):
for i in instances:
i.deploy_async(user=request.user)
i.deploy.async(user=request.user)
if len(instances) > 1:
messages.success(request, _("Successfully created %d VMs!" %
......@@ -1286,7 +1286,7 @@ class VmDelete(LoginRequiredMixin, DeleteView):
if not object.has_level(request.user, 'owner'):
raise PermissionDenied()
object.destroy_async(user=request.user)
object.destroy.async(user=request.user)
success_url = self.get_success_url()
success_message = _("VM successfully deleted!")
......@@ -1466,7 +1466,7 @@ class NodeFlushView(LoginRequiredMixin, SuperuserRequiredMixin, DetailView):
def __flush(self, request):
self.object = self.get_object()
self.object.flush_async(user=request.user)
self.object.flush.async(user=request.user)
success_message = _("Node successfully flushed!")
messages.success(request, success_message)
return redirect(self.get_success_url())
......@@ -1537,7 +1537,7 @@ class VmMassDelete(LoginRequiredMixin, View):
raise PermissionDenied() # no need for rollback or proper
# error message, this can't
# normally happen.
i.destroy_async(request.user)
i.destroy.async(user=request.user)
names.append(i.name)
success_message = _("Mass delete complete, the following VMs were "
......@@ -2066,7 +2066,7 @@ class VmMigrateView(SuperuserRequiredMixin, TemplateView):
if node:
node = Node.objects.get(pk=node)
vm.migrate_async(to_node=node, user=self.request.user)
vm.migrate.async(to_node=node, user=self.request.user)
else:
messages.error(self.request, _("You didn't select a node!"))
......
# This import is responsible for running the operations' registration code.
from . import operations # noqa
......@@ -7,7 +7,11 @@ from django.db.models import CharField, ForeignKey
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from common.models import ActivityModel, activitycontextimpl
from common.models import (
ActivityModel, activitycontextimpl, join_activity_code,
)
logger = getLogger(__name__)
......@@ -24,6 +28,7 @@ class ActivityInProgressError(Exception):
class InstanceActivity(ActivityModel):
ACTIVITY_CODE_BASE = join_activity_code('vm', 'Instance')
instance = ForeignKey('Instance', related_name='activity_log',
help_text=_('Instance this activity works on.'),
verbose_name=_('instance'))
......@@ -65,9 +70,10 @@ class InstanceActivity(ActivityModel):
if concurrency_check and active_activities.exists():
raise ActivityInProgressError(active_activities[0])
act = cls(activity_code='vm.Instance.' + code_suffix,
instance=instance, parent=None, resultant_state=None,
started=timezone.now(), task_uuid=task_uuid, user=user)
activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
act = cls(activity_code=activity_code, instance=instance, parent=None,
resultant_state=None, started=timezone.now(),
task_uuid=task_uuid, user=user)
act.save()
return act
......@@ -78,7 +84,7 @@ class InstanceActivity(ActivityModel):
raise ActivityInProgressError(active_children[0])
act = InstanceActivity(
activity_code=self.activity_code + '.' + code_suffix,
activity_code=join_activity_code(self.activity_code, code_suffix),
instance=self.instance, parent=self, resultant_state=None,
started=timezone.now(), task_uuid=task_uuid, user=self.user)
act.save()
......@@ -109,6 +115,7 @@ def instance_activity(code_suffix, instance, on_abort=None, on_commit=None,
class NodeActivity(ActivityModel):
ACTIVITY_CODE_BASE = join_activity_code('vm', 'Node')
node = ForeignKey('Node', related_name='activity_log',
help_text=_('Node this activity works on.'),
verbose_name=_('node'))
......@@ -131,15 +138,15 @@ class NodeActivity(ActivityModel):
@classmethod
def create(cls, code_suffix, node, task_uuid=None, user=None):
act = cls(activity_code='vm.Node.' + code_suffix,
node=node, parent=None, started=timezone.now(),
task_uuid=task_uuid, user=user)
activity_code = join_activity_code(cls.ACTIVITY_CODE_BASE, code_suffix)
act = cls(activity_code=activity_code, node=node, parent=None,
started=timezone.now(), task_uuid=task_uuid, user=user)
act.save()
return act
def create_sub(self, code_suffix, task_uuid=None):
act = NodeActivity(
activity_code=self.activity_code + '.' + code_suffix,
activity_code=join_activity_code(self.activity_code, code_suffix),
node=self.node, parent=self, started=timezone.now(),
task_uuid=task_uuid, user=self.user)
act.save()
......
......@@ -137,7 +137,7 @@ class Interface(Model):
iface.save()
return iface
def deploy(self, user=None, task_uuid=None):
def deploy(self):
if self.destroyed:
from .instance import Instance
raise Instance.InstanceDestroyedError(self.instance,
......@@ -149,16 +149,23 @@ class Interface(Model):
args=[self.get_vmnetwork_desc()],
queue=self.instance.get_remote_queue_name('net'))
def shutdown(self, user=None, task_uuid=None):
net_tasks.destroy.apply_async(
args=[self.get_vmnetwork_desc()],
queue=self.instance.get_remote_queue_name('net'))
def shutdown(self):
if self.destroyed:
from .instance import Instance
raise Instance.InstanceDestroyedError(self.instance,
"The associated instance "
"(%s) has already been "
"destroyed" % self.instance)
queue_name = self.instance.get_remote_queue_name('net')
net_tasks.destroy.apply_async(args=[self.get_vmnetwork_desc()],
queue=queue_name)
def destroy(self, user=None, task_uuid=None):
def destroy(self):
if self.destroyed:
return
self.shutdown(user, task_uuid)
self.shutdown()
if self.host is not None:
self.host.delete()
......
from __future__ import absolute_import, unicode_literals
from logging import getLogger
from warnings import warn
from django.db.models import (
CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField,
FloatField, permalink,
)
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from warnings import warn
from celery.exceptions import TimeoutError
from model_utils.models import TimeStampedModel
from taggit.managers import TaggableManager
from common.models import method_cache, WorkerNotFound, HumanSortField
from common.operations import OperatedMixin
from firewall.models import Host
from ..tasks import vm_tasks, local_tasks
from .common import Trait
from .activity import node_activity, NodeActivity
from monitor.calvin.calvin import Query
from monitor.calvin.calvin import GraphiteHandler
from django.utils import timezone
from ..tasks import vm_tasks
from .activity import node_activity, NodeActivity
from .common import Trait
logger = getLogger(__name__)
......@@ -38,7 +37,7 @@ def node_available(function):
return decorate
class Node(TimeStampedModel):
class Node(OperatedMixin, TimeStampedModel):
"""A VM host machine, a hypervisor.
"""
......@@ -131,22 +130,6 @@ class Node(TimeStampedModel):
self.enabled = False
self.save()
def flush(self, user=None, task_uuid=None):
"""Disable node and move all instances to other ones.
"""
with node_activity('flush', node=self, user=user,
task_uuid=task_uuid) as act:
self.disable(user, act)
for i in self.instance_set.all():
with act.sub_activity('migrate_instance_%d' % i.pk):
i.migrate()
def flush_async(self, user=None):
"""Execute flush asynchronously.
"""
return local_tasks.flush.apply_async(args=[self, user],
queue="localhost.man")
def enable(self, user=None):
''' Enable the node. '''
if self.enabled is not True:
......@@ -164,10 +147,10 @@ class Node(TimeStampedModel):
@method_cache(30)
def get_remote_queue_name(self, queue_id):
"""Return the name of the remote celery queue for this node.
"""Returns the name of the remote celery queue for this node.
throws Exception if there is no worker on the queue.
Until the cache provide reult there can be dead queues.
Throws Exception if there is no worker on the queue.
The result may include dead queues because of caching.
"""
if vm_tasks.check_queue(self.host.hostname, queue_id):
......@@ -189,7 +172,7 @@ class Node(TimeStampedModel):
else:
logger.debug("The last activity was %s" % act)
if act.activity_code.endswith("offline"):
act = NodeActivity.create(code_suffix='monitor_succes_online',
act = NodeActivity.create(code_suffix='monitor_success_online',
node=self, user=None)
act.started = timezone.now()
act.finished = timezone.now()
......
......@@ -27,7 +27,7 @@ def garbage_collector(timeout=15):
now = timezone.now()
for i in Instance.objects.filter(destroyed_at=None).all():
if i.time_of_delete and now > i.time_of_delete:
i.destroy_async()
i.destroy.async()
logger.info("Expired instance %d destroyed.", i.pk)
try:
i.owner.profile.notify(
......@@ -39,7 +39,7 @@ def garbage_collector(timeout=15):
i.pk, unicode(e))
elif (i.time_of_suspend and now > i.time_of_suspend and
i.state == 'RUNNING'):
i.sleep_async()
i.sleep.async()
logger.info("Expired instance %d suspended." % i.pk)
try:
i.owner.profile.notify(
......
from manager.mancelery import celery
# TODO: Keep synchronised with Instance funcs
@celery.task
def deploy(instance, user):
instance.deploy(task_uuid=deploy.request.id, user=user)
@celery.task
def redeploy(instance, user):
instance.redeploy(task_uuid=redeploy.request.id, user=user)
@celery.task
def shut_off(instance, user):
instance.shut_off(task_uuid=shut_off.request.id, user=user)
def async_instance_operation(operation_id, instance_pk, activity_pk, **kwargs):
from vm.models import Instance, InstanceActivity
instance = Instance.objects.get(pk=instance_pk)
operation = getattr(instance, operation_id)
activity = InstanceActivity.objects.get(pk=activity_pk)
# save async task UUID to activity
activity.task_uuid = async_instance_operation.request.id
activity.save()
@celery.task
def destroy(instance, user):
instance.destroy(task_uuid=destroy.request.id, user=user)
@celery.task
def save_as_template(instance, name, user, params):
instance.save_as_template(name, task_uuid=save_as_template.request.id,
user=user, **params)
@celery.task
def sleep(instance, user):
instance.sleep(task_uuid=sleep.request.id, user=user)
return operation._exec_op(activity=activity, **kwargs)
@celery.task
def wake_up(instance, user):
instance.wake_up(task_uuid=wake_up.request.id, user=user)
def async_node_operation(operation_id, node_pk, activity_pk, **kwargs):
from vm.models import Node, NodeActivity
node = Node.objects.get(pk=node_pk)
operation = getattr(node, operation_id)
activity = NodeActivity.objects.get(pk=activity_pk)
# save async task UUID to activity
activity.task_uuid = async_node_operation.request.id
activity.save()
@celery.task
def shutdown(instance, user):
instance.shutdown(task_uuid=shutdown.request.id, user=user)
@celery.task
def reset(instance, user):
instance.reset(task_uuid=reset.request.id, user=user)
@celery.task
def reboot(instance, user):
instance.reboot(task_uuid=reboot.request.id, user=user)
@celery.task
def migrate(instance, to_node, user):
instance.migrate(to_node, task_uuid=migrate.request.id, user=user)
@celery.task
def flush(node, user):
node.flush(task_uuid=flush.request.id, user=user)
return operation._exec_op(activity=activity, **kwargs)
......@@ -19,18 +19,16 @@ def check_queue(node_hostname, queue_id):
active_queues = get_queues()
if active_queues is None:
return False
# v is List of List of queues dict
node_workers = [v for k, v in active_queues.iteritems()]
for worker in node_workers:
for queue in worker:
if queue['name'] == queue_name:
return True
return False
queue_names = (queue['name'] for worker in active_queues.itervalues()
for queue in worker)
return queue_name in queue_names
def get_queues():
"""Get active celery queues.
Returns a dictionary whose entries are (worker name;list of queues) pairs,
where queues are represented by dictionaries.
Result is cached for 10 seconds!
"""
key = __name__ + u'queues'
......
from datetime import datetime
from mock import Mock, MagicMock, patch, call
from django.contrib.auth.models import User
from django.test import TestCase
from django.utils.translation import ugettext_lazy as _
from mock import Mock, MagicMock, patch, call
from ..models import (
Lease, Node, Interface, Instance, InstanceTemplate, InstanceActivity,
)
from ..models.instance import find_unused_port, ActivityInProgressError
from ..operations import (
DeployOperation, DestroyOperation, FlushOperation, MigrateOperation,
)
class PortFinderTestCase(TestCase):
......@@ -52,50 +55,60 @@ class InstanceTestCase(TestCase):
def test_deploy_destroyed(self):
inst = Mock(destroyed_at=datetime.now(), spec=Instance,
InstanceDestroyedError=Instance.InstanceDestroyedError)
with self.assertRaises(Instance.InstanceDestroyedError):
Instance.deploy(inst)
deploy_op = DeployOperation(inst)
with patch.object(DeployOperation, 'create_activity'):
with self.assertRaises(Instance.InstanceDestroyedError):
deploy_op(system=True)
def test_destroy_destroyed(self):
inst = Mock(destroyed_at=datetime.now(), spec=Instance)
Instance.destroy(inst)
inst = Mock(destroyed_at=datetime.now(), spec=Instance,
InstanceDestroyedError=Instance.InstanceDestroyedError)
destroy_op = DestroyOperation(inst)