Commit ac146765 by Dudás Ádám

vm: make flush into operation

parent f469fe6a
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from logging import getLogger from logging import getLogger
from warnings import warn
from django.db.models import ( from django.db.models import (
CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField, CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField,
FloatField, permalink, FloatField, permalink,
) )
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from warnings import warn
from celery.exceptions import TimeoutError from celery.exceptions import TimeoutError
from model_utils.models import TimeStampedModel from model_utils.models import TimeStampedModel
from taggit.managers import TaggableManager from taggit.managers import TaggableManager
from common.models import method_cache, WorkerNotFound, HumanSortField from common.models import method_cache, WorkerNotFound, HumanSortField
from common.operations import OperatedMixin
from firewall.models import Host from firewall.models import Host
from ..tasks import vm_tasks, local_tasks
from .common import Trait
from .activity import node_activity, NodeActivity
from monitor.calvin.calvin import Query from monitor.calvin.calvin import Query
from monitor.calvin.calvin import GraphiteHandler from monitor.calvin.calvin import GraphiteHandler
from django.utils import timezone from ..tasks import vm_tasks
from .activity import node_activity, NodeActivity
from .common import Trait
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -38,7 +37,7 @@ def node_available(function): ...@@ -38,7 +37,7 @@ def node_available(function):
return decorate return decorate
class Node(TimeStampedModel): class Node(OperatedMixin, TimeStampedModel):
"""A VM host machine, a hypervisor. """A VM host machine, a hypervisor.
""" """
...@@ -131,22 +130,6 @@ class Node(TimeStampedModel): ...@@ -131,22 +130,6 @@ class Node(TimeStampedModel):
self.enabled = False self.enabled = False
self.save() self.save()
def flush(self, user=None, task_uuid=None):
"""Disable node and move all instances to other ones.
"""
with node_activity('flush', node=self, user=user,
task_uuid=task_uuid) as act:
self.disable(user, act)
for i in self.instance_set.all():
with act.sub_activity('migrate_instance_%d' % i.pk):
i.migrate()
def flush_async(self, user=None):
"""Execute flush asynchronously.
"""
return local_tasks.flush.apply_async(args=[self, user],
queue="localhost.man")
def enable(self, user=None): def enable(self, user=None):
''' Enable the node. ''' ''' Enable the node. '''
if self.enabled is not True: if self.enabled is not True:
......
...@@ -11,8 +11,10 @@ from celery.exceptions import TimeLimitExceeded ...@@ -11,8 +11,10 @@ from celery.exceptions import TimeLimitExceeded
from common.operations import Operation, register_operation from common.operations import Operation, register_operation
from storage.models import Disk from storage.models import Disk
from .tasks import vm_tasks from .tasks import vm_tasks
from .tasks.local_tasks import async_instance_operation from .tasks.local_tasks import async_instance_operation, async_node_operation
from .models import Instance, InstanceActivity, InstanceTemplate from .models import (
Instance, InstanceActivity, InstanceTemplate, Node, NodeActivity,
)
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -409,3 +411,35 @@ class WakeUpOperation(InstanceOperation): ...@@ -409,3 +411,35 @@ class WakeUpOperation(InstanceOperation):
register_instance_operation(WakeUpOperation) register_instance_operation(WakeUpOperation)
class NodeOperation(Operation):
async_operation = async_node_operation
def __init__(self, node):
super(NodeOperation, self).__init__(subject=node)
self.node = node
def create_activity(self, user):
return NodeActivity.create(code_suffix=self.activity_code_suffix,
node=self.node, user=user)
def register_node_operation(op_cls, op_id=None):
return register_operation(Node, op_cls, op_id)
class FlushOperation(NodeOperation):
activity_code_suffix = 'flush'
id = 'flush'
name = _("flush")
description = _("""Disable node and move all instances to other ones.""")
def _operation(self, activity, user, system):
self.node.disable(user, activity)
for i in self.node.instance_set.all():
with activity.sub_activity('migrate_instance_%d' % i.pk):
i.migrate()
register_node_operation(FlushOperation)
...@@ -16,5 +16,14 @@ def async_instance_operation(operation_id, instance_pk, activity_pk, **kwargs): ...@@ -16,5 +16,14 @@ def async_instance_operation(operation_id, instance_pk, activity_pk, **kwargs):
@celery.task @celery.task
def flush(node, user): def async_node_operation(operation_id, node_pk, activity_pk, **kwargs):
node.flush(task_uuid=flush.request.id, user=user) from vm.models import Node, NodeActivity
node = Node.objects.get(pk=node_pk)
operation = getattr(node, operation_id)
activity = NodeActivity.objects.get(pk=activity_pk)
# save async task UUID to activity
activity.task_uuid = async_node_operation.request.id
activity.save()
return operation._exec_op(activity=activity, **kwargs)
...@@ -10,7 +10,7 @@ from ..models import ( ...@@ -10,7 +10,7 @@ from ..models import (
) )
from ..models.instance import find_unused_port, ActivityInProgressError from ..models.instance import find_unused_port, ActivityInProgressError
from ..operations import ( from ..operations import (
DeployOperation, DestroyOperation, MigrateOperation DeployOperation, DestroyOperation, FlushOperation, MigrateOperation,
) )
...@@ -244,35 +244,37 @@ class InstanceActivityTestCase(TestCase): ...@@ -244,35 +244,37 @@ class InstanceActivityTestCase(TestCase):
subact.__enter__.assert_called() subact.__enter__.assert_called()
def test_flush(self): def test_flush(self):
node = MagicMock(spec=Node, enabled=True)
user = MagicMock(spec=User)
insts = [MagicMock(spec=Instance, migrate=MagicMock()), insts = [MagicMock(spec=Instance, migrate=MagicMock()),
MagicMock(spec=Instance, migrate=MagicMock())] MagicMock(spec=Instance, migrate=MagicMock())]
node = MagicMock(spec=Node, enabled=True)
node.instance_set.all.return_value = insts
user = MagicMock(spec=User)
flush_op = FlushOperation(node)
with patch('vm.models.node.node_activity') as na: with patch.object(FlushOperation, 'create_activity') as create_act:
act = na.return_value.__enter__.return_value = MagicMock() act = create_act.return_value = MagicMock()
node.instance_set.all.return_value = insts
Node.flush(node, user) flush_op(user=user)
na.__enter__.assert_called() create_act.assert_called()
node.disable.assert_called_with(user, act) node.disable.assert_called_with(user, act)
for i in insts: for i in insts:
i.migrate.assert_called() i.migrate.assert_called()
def test_flush_disabled_wo_user(self): def test_flush_disabled_wo_user(self):
node = MagicMock(spec=Node, enabled=False)
insts = [MagicMock(spec=Instance, migrate=MagicMock()), insts = [MagicMock(spec=Instance, migrate=MagicMock()),
MagicMock(spec=Instance, migrate=MagicMock())] MagicMock(spec=Instance, migrate=MagicMock())]
node = MagicMock(spec=Node, enabled=False)
node.instance_set.all.return_value = insts
flush_op = FlushOperation(node)
with patch('vm.models.node.node_activity') as na: with patch.object(FlushOperation, 'create_activity') as create_act:
act = na.return_value.__enter__.return_value = MagicMock() act = create_act.return_value = MagicMock()
node.instance_set.all.return_value = insts
Node.flush(node) flush_op(system=True)
create_act.assert_called()
node.disable.assert_called_with(None, act) node.disable.assert_called_with(None, act)
# ^ should be called, but real method no-ops if disabled # ^ should be called, but real method no-ops if disabled
na.__enter__.assert_called()
for i in insts: for i in insts:
i.migrate.assert_called() i.migrate.assert_called()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment