Commit faaace05 by Őry Máté

Merge branch 'feature-vm-tasks' into 'master'

Sub- and Remote Operations

closes #304

requires [vmdriver!4](vmdriver!4)

See merge request !220
parents 664f2527 5254e2ad
......@@ -26,6 +26,17 @@ from .models import activity_context, has_suffix, humanize_exception
logger = getLogger(__name__)
class SubOperationMixin(object):
required_perms = ()
def create_activity(self, parent, user, kwargs):
if not parent:
raise TypeError("SubOperation can only be called with "
"parent_activity specified.")
return super(SubOperationMixin, self).create_activity(
parent, user, kwargs)
class Operation(object):
"""Base class for VM operations.
"""
......@@ -36,6 +47,10 @@ class Operation(object):
abortable = False
has_percentage = False
@classmethod
def get_activity_code_suffix(cls):
return cls.id
def __call__(self, **kwargs):
return self.call(**kwargs)
......@@ -232,7 +247,7 @@ class OperatedMixin(object):
operation could be found.
"""
for op in getattr(self, operation_registry_name, {}).itervalues():
if has_suffix(activity_code, op.activity_code_suffix):
if has_suffix(activity_code, op.get_activity_code_suffix()):
return op(self)
else:
return None
......
......@@ -27,9 +27,7 @@ class OperationTestCase(TestCase):
class AbortEx(Exception):
pass
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
op = TestOp(MagicMock())
op.async_operation = MagicMock(
apply_async=MagicMock(side_effect=AbortEx))
......@@ -44,9 +42,7 @@ class OperationTestCase(TestCase):
class AbortEx(Exception):
pass
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
op = TestOp(MagicMock())
with patch.object(Operation, 'create_activity', side_effect=AbortEx):
with patch.object(Operation, 'check_precond') as chk_pre:
try:
......@@ -55,9 +51,7 @@ class OperationTestCase(TestCase):
self.assertTrue(chk_pre.called)
def test_auth_check_on_non_system_call(self):
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
op = TestOp(MagicMock())
user = MagicMock()
with patch.object(Operation, 'check_auth') as check_auth:
with patch.object(Operation, 'check_precond'), \
......@@ -67,9 +61,7 @@ class OperationTestCase(TestCase):
check_auth.assert_called_with(user)
def test_no_auth_check_on_system_call(self):
op = Operation(MagicMock())
op.activity_code_suffix = 'test'
op.id = 'test'
op = TestOp(MagicMock())
with patch.object(Operation, 'check_auth', side_effect=AssertionError):
with patch.object(Operation, 'check_precond'), \
patch.object(Operation, 'create_activity'), \
......@@ -77,39 +69,25 @@ class OperationTestCase(TestCase):
op.call(system=True)
def test_no_exception_for_more_arguments_when_operation_takes_kwargs(self):
class KwargOp(Operation):
activity_code_suffix = 'test'
id = 'test'
def _operation(self, **kwargs):
pass
op = KwargOp(MagicMock())
with patch.object(KwargOp, 'create_activity'), \
patch.object(KwargOp, '_exec_op'):
op = TestOp(MagicMock())
with patch.object(TestOp, 'create_activity'), \
patch.object(TestOp, '_exec_op'):
op.call(system=True, foo=42)
def test_exception_for_unexpected_arguments(self):
class TestOp(Operation):
activity_code_suffix = 'test'
id = 'test'
def _operation(self):
pass
op = TestOp(MagicMock())
with patch.object(TestOp, 'create_activity'), \
patch.object(TestOp, '_exec_op'):
self.assertRaises(TypeError, op.call, system=True, foo=42)
self.assertRaises(TypeError, op.call, system=True, bar=42)
def test_exception_for_missing_arguments(self):
class TestOp(Operation):
activity_code_suffix = 'test'
op = TestOp(MagicMock())
with patch.object(TestOp, 'create_activity'):
self.assertRaises(TypeError, op.call, system=True)
class TestOp(Operation):
id = 'test'
def _operation(self, foo):
pass
op = TestOp(MagicMock())
with patch.object(TestOp, 'create_activity'):
self.assertRaises(TypeError, op.call, system=True)
......@@ -512,20 +512,20 @@ class VmDetailTest(LoginMixin, TestCase):
self.login(c, "user2")
with patch.object(Instance, 'select_node', return_value=None), \
patch.object(WakeUpOperation, 'async') as new_wake_up, \
patch('vm.tasks.vm_tasks.wake_up.apply_async') as wuaa, \
patch.object(Instance.WrongStateError, 'send_message') as wro:
inst = Instance.objects.get(pk=1)
new_wake_up.side_effect = inst.wake_up
inst._wake_up_vm = Mock()
inst.get_remote_queue_name = Mock(return_value='test')
inst.status = 'SUSPENDED'
inst.set_level(self.u2, 'owner')
with patch('dashboard.views.messages') as msg:
response = c.post("/dashboard/vm/1/op/wake_up/")
assert not msg.error.called
assert inst._wake_up_vm.called
self.assertEqual(response.status_code, 302)
self.assertEqual(inst.status, 'RUNNING')
assert new_wake_up.called
assert wuaa.called
assert not wro.called
def test_unpermitted_wake_up(self):
......
......@@ -7,7 +7,6 @@ from .common import BaseResourceConfigModel
from .common import Lease
from .common import NamedBaseResourceConfig
from .common import Trait
from .instance import InstanceActiveManager
from .instance import VirtualMachineDescModel
from .instance import InstanceTemplate
from .instance import Instance
......@@ -19,7 +18,7 @@ from .network import Interface
from .node import Node
__all__ = [
'InstanceActivity', 'InstanceActiveManager', 'BaseResourceConfigModel',
'InstanceActivity', 'BaseResourceConfigModel',
'NamedBaseResourceConfig', 'VirtualMachineDescModel', 'InstanceTemplate',
'Instance', 'instance_activity', 'post_state_changed', 'pre_state_changed',
'InterfaceTemplate', 'Interface', 'Trait', 'Node', 'NodeActivity', 'Lease',
......
......@@ -20,11 +20,8 @@ from datetime import timedelta
from functools import partial
from importlib import import_module
from logging import getLogger
from string import ascii_lowercase
from warnings import warn
from celery.exceptions import TimeoutError
from celery.contrib.abortable import AbortableAsyncResult
import django.conf
from django.contrib.auth.models import User
from django.core import signing
......@@ -38,15 +35,16 @@ from django.utils import timezone
from django.utils.translation import ugettext_lazy as _, ugettext_noop
from model_utils import Choices
from model_utils.managers import QueryManager
from model_utils.models import TimeStampedModel, StatusModel
from taggit.managers import TaggableManager
from acl.models import AclBase
from common.models import (
create_readable, HumanReadableException, humanize_exception
create_readable, HumanReadableException,
)
from common.operations import OperatedMixin
from ..tasks import vm_tasks, agent_tasks
from ..tasks import agent_tasks
from .activity import (ActivityInProgressError, instance_activity,
InstanceActivity)
from .common import BaseResourceConfigModel, Lease
......@@ -92,13 +90,6 @@ def find_unused_vnc_port():
return port
class InstanceActiveManager(Manager):
def get_query_set(self):
return super(InstanceActiveManager,
self).get_query_set().filter(destroyed_at=None)
class VirtualMachineDescModel(BaseResourceConfigModel):
"""Abstract base for virtual machine describing models.
......@@ -264,7 +255,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
help_text=_("The virtual machine's time of "
"destruction."))
objects = Manager()
active = InstanceActiveManager()
active = QueryManager(destroyed_at=None)
class Meta:
app_label = 'vm'
......@@ -745,75 +736,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
"""
return scheduler.select_node(self, Node.objects.all())
def attach_disk(self, disk, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.attach_disk.apply_async(
args=[self.vm_name,
disk.get_vmdisk_desc()],
queue=queue_name
).get(timeout=timeout)
def detach_disk(self, disk, timeout=15):
try:
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.detach_disk.apply_async(
args=[self.vm_name,
disk.get_vmdisk_desc()],
queue=queue_name
).get(timeout=timeout)
except Exception as e:
if e.libvirtError and "not found" in str(e):
logger.debug("Disk %s was not found."
% disk.name)
else:
raise
def attach_network(self, network, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.attach_network.apply_async(
args=[self.vm_name,
network.get_vmnetwork_desc()],
queue=queue_name
).get(timeout=timeout)
def detach_network(self, network, timeout=15):
try:
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.detach_network.apply_async(
args=[self.vm_name,
network.get_vmnetwork_desc()],
queue=queue_name
).get(timeout=timeout)
except Exception as e:
if e.libvirtError and "not found" in str(e):
logger.debug("Interface %s was not found."
% (network.__unicode__()))
else:
raise
def resize_disk_live(self, disk, size, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'slow')
result = vm_tasks.resize_disk.apply_async(
args=[self.vm_name, disk.path, size],
queue=queue_name).get(timeout=timeout)
disk.size = size
disk.save()
return result
def deploy_disks(self):
"""Deploy all associated disks.
"""
devnums = list(ascii_lowercase) # a-z
for disk in self.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
def destroy_disks(self):
"""Destroy all associated disks.
"""
......@@ -838,103 +760,9 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
for net in self.interface_set.all():
net.shutdown()
def delete_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'fast')
try:
return vm_tasks.destroy.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
except Exception as e:
if e.libvirtError and "Domain not found" in str(e):
logger.debug("Domain %s was not found at %s"
% (self.vm_name, queue_name))
else:
raise
def deploy_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.deploy.apply_async(args=[self.get_vm_desc()],
queue=queue_name
).get(timeout=timeout)
def migrate_vm(self, to_node, timeout=120):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.migrate.apply_async(args=[self.vm_name,
to_node.host.hostname,
True],
queue=queue_name
).get(timeout=timeout)
def reboot_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.reboot.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def reset_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.reset.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def resume_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.resume.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def shutdown_vm(self, task=None, step=5):
queue_name = self.get_remote_queue_name('vm', 'slow')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name,
self.vm_name)
remote = vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name},
queue=queue_name)
while True:
try:
return remote.get(timeout=step)
except TimeoutError as e:
if task is not None and task.is_aborted():
AbortableAsyncResult(remote.id).abort()
raise humanize_exception(ugettext_noop(
"Operation aborted by user."), e)
def suspend_vm(self, timeout=230):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.sleep.apply_async(args=[self.vm_name,
self.mem_dump['path']],
queue=queue_name
).get(timeout=timeout)
def wake_up_vm(self, timeout=60):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.wake_up.apply_async(args=[self.vm_name,
self.mem_dump['path']],
queue=queue_name
).get(timeout=timeout)
def delete_mem_dump(self, timeout=15):
queue_name = self.mem_dump['datastore'].get_remote_queue_name(
'storage', 'fast')
from storage.tasks.storage_tasks import delete_dump
delete_dump.apply_async(args=[self.mem_dump['path']],
queue=queue_name).get(timeout=timeout)
def reallocate_node(self, activity):
with activity.sub_activity(
'scheduling',
readable_name=ugettext_noop("schedule")) as sa:
sa.result = node = self.select_node()
return node
def allocate_node(self, activity):
if self.node is not None:
return None
with activity.sub_activity(
'scheduling',
readable_name=ugettext_noop("schedule")) as sa:
sa.result = self.node = self.select_node()
def allocate_node(self):
if self.node is None:
self.node = self.select_node()
self.save()
return self.node
......@@ -1009,12 +837,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
return merged_acts
def get_screenshot(self, timeout=5):
queue_name = self.get_remote_queue_name("vm", "fast")
return vm_tasks.screenshot.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def get_latest_activity_in_progress(self):
try:
return InstanceActivity.objects.filter(
......
......@@ -28,12 +28,13 @@ from django.conf import settings
from sizefield.utils import filesizeformat
from celery.exceptions import TimeLimitExceeded
from celery.contrib.abortable import AbortableAsyncResult
from celery.exceptions import TimeLimitExceeded, TimeoutError
from common.models import (
create_readable, humanize_exception, HumanReadableException
)
from common.operations import Operation, register_operation
from common.operations import Operation, register_operation, SubOperationMixin
from manager.scheduler import SchedulerError
from .tasks.local_tasks import (
abortable_async_instance_operation, abortable_async_node_operation,
......@@ -42,13 +43,46 @@ from .models import (
Instance, InstanceActivity, InstanceTemplate, Interface, Node,
NodeActivity, pwgen
)
from .tasks import agent_tasks, local_agent_tasks
from .tasks import agent_tasks, local_agent_tasks, vm_tasks
from dashboard.store_api import Store, NoStoreException
from storage.tasks import storage_tasks
logger = getLogger(__name__)
class RemoteOperationMixin(object):
remote_timeout = 30
def _operation(self, **kwargs):
args = self._get_remote_args(**kwargs)
return self.task.apply_async(
args=args, queue=self._get_remote_queue()
).get(timeout=self.remote_timeout)
def check_precond(self):
super(RemoteOperationMixin, self).check_precond()
self._get_remote_queue()
class AbortableRemoteOperationMixin(object):
remote_step = property(lambda self: self.remote_timeout / 10)
def _operation(self, task, **kwargs):
args = self._get_remote_args(**kwargs),
remote = self.task.apply_async(
args=args, queue=self._get_remote_queue())
for i in xrange(0, self.remote_timeout, self.remote_step):
try:
return remote.get(timeout=self.remote_step)
except TimeoutError as e:
if task is not None and task.is_aborted():
AbortableAsyncResult(remote.id).abort()
raise humanize_exception(ugettext_noop(
"Operation aborted by user."), e)
class InstanceOperation(Operation):
acl_level = 'owner'
async_operation = abortable_async_instance_operation
......@@ -100,12 +134,13 @@ class InstanceOperation(Operation):
"parent activity does not match the user "
"provided as parameter.")
return parent.create_sub(code_suffix=self.activity_code_suffix,
readable_name=name,
resultant_state=self.resultant_state)
return parent.create_sub(
code_suffix=self.get_activity_code_suffix(),
readable_name=name, resultant_state=self.resultant_state)
else:
return InstanceActivity.create(
code_suffix=self.activity_code_suffix, instance=self.instance,
code_suffix=self.get_activity_code_suffix(),
instance=self.instance,
readable_name=name, user=user,
concurrency_check=self.concurrency_check,
resultant_state=self.resultant_state)
......@@ -116,9 +151,19 @@ class InstanceOperation(Operation):
return False
class RemoteInstanceOperation(RemoteOperationMixin, InstanceOperation):
remote_queue = ('vm', 'fast')
def _get_remote_queue(self):
return self.instance.get_remote_queue_name(*self.remote_queue)
def _get_remote_args(self, **kwargs):
return [self.instance.vm_name]
@register_operation
class AddInterfaceOperation(InstanceOperation):
activity_code_suffix = 'add_interface'
id = 'add_interface'
name = _("add interface")
description = _("Add a new network interface for the specified VLAN to "
......@@ -146,10 +191,8 @@ class AddInterfaceOperation(InstanceOperation):
if self.instance.is_running:
try:
with activity.sub_activity(
'attach_network',
readable_name=ugettext_noop("attach network")):
self.instance.attach_network(net)
self.instance._attach_network(
interface=net, parent_activity=activity)
except Exception as e:
if hasattr(e, 'libvirtError'):
self.rollback(net, activity)
......@@ -165,7 +208,6 @@ class AddInterfaceOperation(InstanceOperation):
@register_operation
class CreateDiskOperation(InstanceOperation):
activity_code_suffix = 'create_disk'
id = 'create_disk'
name = _("create disk")
description = _("Create and attach empty disk to the virtual machine.")
......@@ -192,11 +234,7 @@ class CreateDiskOperation(InstanceOperation):
readable_name=ugettext_noop("deploying disk")
):
disk.deploy()
with activity.sub_activity(
'attach_disk',
readable_name=ugettext_noop("attach disk")
):
self.instance.attach_disk(disk)
self.instance._attach_disk(parent_activity=activity, disk=disk)
def get_activity_name(self, kwargs):
return create_readable(
......@@ -205,9 +243,8 @@ class CreateDiskOperation(InstanceOperation):
@register_operation
class ResizeDiskOperation(InstanceOperation):
class ResizeDiskOperation(RemoteInstanceOperation):
activity_code_suffix = 'resize_disk'
id = 'resize_disk'
name = _("resize disk")
description = _("Resize the virtual disk image. "
......@@ -215,9 +252,12 @@ class ResizeDiskOperation(InstanceOperation):
required_perms = ('storage.resize_disk', )
accept_states = ('RUNNING', )
async_queue = "localhost.man.slow"
remote_queue = ('vm', 'slow')
task = vm_tasks.resize_disk
def _operation(self, user, disk, size, activity):
self.instance.resize_disk_live(disk, size)
def _get_remote_args(self, disk, size, **kwargs):
return (super(ResizeDiskOperation, self)
._get_remote_args(**kwargs) + [disk.path, size])
def get_activity_name(self, kwargs):
return create_readable(
......@@ -227,7 +267,6 @@ class ResizeDiskOperation(InstanceOperation):
@register_operation
class DownloadDiskOperation(InstanceOperation):
activity_code_suffix = 'download_disk'
id = 'download_disk'
name = _("download disk")
description = _("Download and attach disk image (ISO file) for the "
......@@ -257,16 +296,11 @@ class DownloadDiskOperation(InstanceOperation):
# TODO iso (cd) hot-plug is not supported by kvm/guests
if self.instance.is_running and disk.type not in ["iso"]:
with activity.sub_activity(
'attach_disk',
readable_name=ugettext_noop("attach disk")
):
self.instance.attach_disk(disk)
self.instance._attach_disk(parent_activity=activity, disk=disk)
@register_operation
class DeployOperation(InstanceOperation):
activity_code_suffix = 'deploy'
id = 'deploy'
name = _("deploy")
description = _("Deploy and start the virtual machine (including storage "
......@@ -293,22 +327,14 @@ class DeployOperation(InstanceOperation):
def _operation(self, activity, timeout=15):
# Allocate VNC port and host node
self.instance.allocate_vnc_port()
self.instance.allocate_node(activity)
self.instance.allocate_node()
# Deploy virtual images
with activity.sub_activity(
'deploying_disks', readable_name=ugettext_noop(
"deploy disks")):
self.instance.deploy_disks()
self.instance._deploy_disks(parent_activity=activity)
# Deploy VM on remote machine
if self.instance.state not in ['PAUSED']:
rn = create_readable(ugettext_noop("deploy virtual machine"),
ugettext_noop("deploy vm to %(node)s"),
node=self.instance.node)
with activity.sub_activity(
'deploying_vm', readable_name=rn) as deploy_act:
deploy_act.result = self.instance.deploy_vm(timeout=timeout)
self.instance._deploy_vm(parent_activity=activity)
# Establish network connection (vmdriver)
with activity.sub_activity(
......@@ -321,20 +347,57 @@ class DeployOperation(InstanceOperation):
except:
pass
# Resume vm
with activity.sub_activity(
'booting', readable_name=ugettext_noop(
"boot virtual machine")):
self.instance.resume_vm(timeout=timeout)
self.instance._resume_vm(parent_activity=activity)
if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True)
@register_operation
class DeployVmOperation(SubOperationMixin, RemoteInstanceOperation):
id = "_deploy_vm"
name = _("deploy vm")
description = _("Deploy virtual machine.")
remote_queue = ("vm", "slow")
task = vm_tasks.deploy
def _get_remote_args(self, **kwargs):
return [self.instance.get_vm_desc()]
# intentionally not calling super
def get_activity_name(self, kwargs):
return create_readable(ugettext_noop("deploy virtual machine"),
ugettext_noop("deploy vm to %(node)s"),
node=self.instance.node)
@register_operation
class DeployDisksOperation(SubOperationMixin, InstanceOperation):
id = "_deploy_disks"
name = _("deploy disks")
description = _("Deploy all associated disks.")
def _operation(self):
devnums = list(ascii_lowercase) # a-z
for disk in self.instance.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
@register_operation
class ResumeVmOperation(SubOperationMixin, RemoteInstanceOperation):
id = "_resume_vm"
name = _("boot virtual machine")
remote_queue = ("vm", "slow")
task = vm_tasks.resume
@register_operation
class DestroyOperation(InstanceOperation):
activity_code_suffix = 'destroy'
id = 'destroy'
name = _("destroy")
description = _("Permanently destroy virtual machine, its network "
......@@ -352,11 +415,7 @@ class DestroyOperation(InstanceOperation):
self.instance.destroy_net()
if self.instance.node:
# Delete virtual machine
with activity.sub_activity(
'destroying_vm',
readable_name=ugettext_noop("destroy virtual machine")):
self.instance.delete_vm()
self.instance._delete_vm(parent_activity=activity)
# Destroy disks
with activity.sub_activity(
......@@ -366,7 +425,7 @@ class DestroyOperation(InstanceOperation):
# Delete mem. dump if exists
try:
self.instance.delete_mem_dump()
self.instance._delete_mem_dump(parent_activity=activity)
except:
pass
......@@ -377,10 +436,30 @@ class DestroyOperation(InstanceOperation):
self.instance.destroyed_at = timezone.now()
self.instance.save()
@register_operation
class DeleteVmOperation(SubOperationMixin, RemoteInstanceOperation):
id = "_delete_vm"
name = _("destroy virtual machine")
task = vm_tasks.destroy
# if e.libvirtError and "Domain not found" in str(e):
@register_operation
class DeleteMemDumpOperation(RemoteOperationMixin, SubOperationMixin,
InstanceOperation):
id = "_delete_mem_dump"
name = _("removing memory dump")
task = storage_tasks.delete_dump
def _get_remote_queue(self):
return self.instance.mem_dump['datastore'].get_remote_queue_name(
"storage", "fast")
def _get_remote_args(self, **kwargs):
return [self.instance.mem_dump['path']]
@register_operation
class MigrateOperation(InstanceOperation):
activity_code_suffix = 'migrate'
class MigrateOperation(RemoteInstanceOperation):
id = 'migrate'
name = _("migrate")
description = _("Move virtual machine to an other worker node with a few "
......@@ -389,6 +468,14 @@ class MigrateOperation(InstanceOperation):
superuser_required = True
accept_states = ('RUNNING', )
async_queue = "localhost.man.slow"
task = vm_tasks.migrate
remote_queue = ("vm", "slow")
timeout = 600
def _get_remote_args(self, to_node, **kwargs):
return (super(MigrateOperation, self)._get_remote_args(**kwargs)
+ [to_node.host.hostname, True])
# TODO handle non-live migration
def rollback(self, activity):
with activity.sub_activity(
......@@ -396,14 +483,19 @@ class MigrateOperation(InstanceOperation):
"redeploy network (rollback)")):
self.instance.deploy_net()
def _operation(self, activity, to_node=None, timeout=120):
def _operation(self, activity, to_node=None):
if not to_node:
to_node = self.instance.reallocate_node(activity)
with activity.sub_activity('scheduling',
readable_name=ugettext_noop(
"schedule")) as sa:
to_node = self.instance.select_node()
sa.result = to_node
try:
with activity.sub_activity(
'migrate_vm', readable_name=create_readable(
ugettext_noop("migrate to %(node)s"), node=to_node)):
self.instance.migrate_vm(to_node=to_node, timeout=timeout)
super(MigrateOperation, self)._operation(to_node=to_node)
except Exception as e:
if hasattr(e, 'libvirtError'):
self.rollback(activity)
......@@ -427,17 +519,17 @@ class MigrateOperation(InstanceOperation):
@register_operation
class RebootOperation(InstanceOperation):
activity_code_suffix = 'reboot'
class RebootOperation(RemoteInstanceOperation):
id = 'reboot'
name = _("reboot")
description = _("Warm reboot virtual machine by sending Ctrl+Alt+Del "
"signal to its console.")
required_perms = ()
accept_states = ('RUNNING', )
task = vm_tasks.reboot
def _operation(self, activity, timeout=5):
self.instance.reboot_vm(timeout=timeout)
def _operation(self, activity):
super(RebootOperation, self)._operation()
if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True)
......@@ -445,7 +537,6 @@ class RebootOperation(InstanceOperation):
@register_operation
class RemoveInterfaceOperation(InstanceOperation):
activity_code_suffix = 'remove_interface'
id = 'remove_interface'
name = _("remove interface")
description = _("Remove the specified network interface and erase IP "
......@@ -456,11 +547,8 @@ class RemoveInterfaceOperation(InstanceOperation):
def _operation(self, activity, user, system, interface):
if self.instance.is_running:
with activity.sub_activity(
'detach_network',
readable_name=ugettext_noop("detach network")
):
self.instance.detach_network(interface)
self.instance._detach_network(interface=interface,
parent_activity=activity)
interface.shutdown()
interface.destroy()
......@@ -473,7 +561,6 @@ class RemoveInterfaceOperation(InstanceOperation):
@register_operation
class RemoveDiskOperation(InstanceOperation):
activity_code_suffix = 'remove_disk'
id = 'remove_disk'
name = _("remove disk")
description = _("Remove the specified disk from the virtual machine, and "
......@@ -483,11 +570,7 @@ class RemoveDiskOperation(InstanceOperation):
def _operation(self, activity, user, system, disk):
if self.instance.is_running and disk.type not in ["iso"]:
with activity.sub_activity(
'detach_disk',
readable_name=ugettext_noop('detach disk')
):
self.instance.detach_disk(disk)
self.instance._detach_disk(disk=disk, parent_activity=activity)
with activity.sub_activity(
'destroy_disk',
readable_name=ugettext_noop('destroy disk')
......@@ -500,16 +583,16 @@ class RemoveDiskOperation(InstanceOperation):
@register_operation
class ResetOperation(InstanceOperation):
activity_code_suffix = 'reset'
class ResetOperation(RemoteInstanceOperation):
id = 'reset'
name = _("reset")
description = _("Cold reboot virtual machine (power cycle).")
required_perms = ()
accept_states = ('RUNNING', )
task = vm_tasks.reset
def _operation(self, activity, timeout=5):
self.instance.reset_vm(timeout=timeout)
def _operation(self, activity):
super(ResetOperation, self)._operation()
if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True)
......@@ -517,7 +600,6 @@ class ResetOperation(InstanceOperation):
@register_operation
class SaveAsTemplateOperation(InstanceOperation):
activity_code_suffix = 'save_as_template'
id = 'save_as_template'
name = _("save as template")
description = _("Save virtual machine as a template so they can be shared "
......@@ -613,8 +695,8 @@ class SaveAsTemplateOperation(InstanceOperation):
@register_operation
class ShutdownOperation(InstanceOperation):
activity_code_suffix = 'shutdown'
class ShutdownOperation(AbortableRemoteOperationMixin,
RemoteInstanceOperation):
id = 'shutdown'
name = _("shutdown")
description = _("Try to halt virtual machine by a standard ACPI signal, "
......@@ -625,9 +707,12 @@ class ShutdownOperation(InstanceOperation):
required_perms = ()
accept_states = ('RUNNING', )
resultant_state = 'STOPPED'
task = vm_tasks.shutdown
remote_queue = ("vm", "slow")
timeout = 120
def _operation(self, task=None):
self.instance.shutdown_vm(task=task)
def _operation(self, task):
super(ShutdownOperation, self)._operation(task=task)
self.instance.yield_node()
def on_abort(self, activity, error):
......@@ -644,7 +729,6 @@ class ShutdownOperation(InstanceOperation):
@register_operation
class ShutOffOperation(InstanceOperation):
activity_code_suffix = 'shut_off'
id = 'shut_off'
name = _("shut off")
description = _("Forcibly halt a virtual machine without notifying the "
......@@ -663,16 +747,12 @@ class ShutOffOperation(InstanceOperation):
with activity.sub_activity('shutdown_net'):
self.instance.shutdown_net()
# Delete virtual machine
with activity.sub_activity('delete_vm'):
self.instance.delete_vm()
self.instance._delete_vm(parent_activity=activity)
self.instance.yield_node()
@register_operation
class SleepOperation(InstanceOperation):
activity_code_suffix = 'sleep'
id = 'sleep'
name = _("sleep")
description = _("Suspend virtual machine. This means the machine is "
......@@ -698,25 +778,30 @@ class SleepOperation(InstanceOperation):
else:
activity.resultant_state = 'ERROR'
def _operation(self, activity, timeout=240):
# Destroy networks
with activity.sub_activity('shutdown_net', readable_name=ugettext_noop(
def _operation(self, activity):
with activity.sub_activity('shutdown_net',
readable_name=ugettext_noop(
"shutdown network")):
self.instance.shutdown_net()
self.instance._suspend_vm(parent_activity=activity)
self.instance.yield_node()
# Suspend vm
with activity.sub_activity('suspending',
readable_name=ugettext_noop(
"suspend virtual machine")):
self.instance.suspend_vm(timeout=timeout)
@register_operation
class SuspendVmOperation(SubOperationMixin, RemoteInstanceOperation):
id = "_suspend_vm"
name = _("suspend virtual machine")
task = vm_tasks.sleep
remote_queue = ("vm", "slow")
timeout = 600
self.instance.yield_node()
# VNC port needs to be kept
def _get_remote_args(self, **kwargs):
return (super(SleepOperation.SuspendVmOperation, self)
._get_remote_args(**kwargs)
+ [self.instance.mem_dump['path']])
@register_operation
class WakeUpOperation(InstanceOperation):
activity_code_suffix = 'wake_up'
id = 'wake_up'
name = _("wake up")
description = _("Wake up sleeping (suspended) virtual machine. This will "
......@@ -735,16 +820,13 @@ class WakeUpOperation(InstanceOperation):
else:
activity.resultant_state = 'ERROR'
def _operation(self, activity, timeout=60):
def _operation(self, activity):
# Schedule vm
self.instance.allocate_vnc_port()
self.instance.allocate_node(activity)
self.instance.allocate_node()
# Resume vm
with activity.sub_activity(
'resuming', readable_name=ugettext_noop(
"resume virtual machine")):
self.instance.wake_up_vm(timeout=timeout)
self.instance._wake_up_vm(parent_activity=activity)
# Estabilish network connection (vmdriver)
with activity.sub_activity(
......@@ -757,10 +839,22 @@ class WakeUpOperation(InstanceOperation):
except:
pass
@register_operation
class WakeUpVmOperation(SubOperationMixin, RemoteInstanceOperation):
id = "_wake_up_vm"
name = _("resume virtual machine")
task = vm_tasks.wake_up
remote_queue = ("vm", "slow")
timeout = 600
def _get_remote_args(self, **kwargs):
return (super(WakeUpOperation.WakeUpVmOperation, self)
._get_remote_args(**kwargs)
+ [self.instance.mem_dump['path']])
@register_operation
class RenewOperation(InstanceOperation):
activity_code_suffix = 'renew'
id = 'renew'
name = _("renew")
description = _("Virtual machines are suspended and destroyed after they "
......@@ -795,7 +889,6 @@ class RenewOperation(InstanceOperation):
@register_operation
class ChangeStateOperation(InstanceOperation):
activity_code_suffix = 'emergency_change_state'
id = 'emergency_change_state'
name = _("emergency state change")
description = _("Change the virtual machine state to NOSTATE. This "
......@@ -825,7 +918,6 @@ class ChangeStateOperation(InstanceOperation):
@register_operation
class RedeployOperation(InstanceOperation):
activity_code_suffix = 'redeploy'
id = 'redeploy'
name = _("redeploy")
description = _("Change the virtual machine state to NOSTATE "
......@@ -879,17 +971,17 @@ class NodeOperation(Operation):
"parent activity does not match the user "
"provided as parameter.")
return parent.create_sub(code_suffix=self.activity_code_suffix,
return parent.create_sub(
code_suffix=self.get_activity_code_suffix(),
readable_name=name)
else:
return NodeActivity.create(code_suffix=self.activity_code_suffix,
node=self.node, user=user,
readable_name=name)
return NodeActivity.create(
code_suffix=self.get_activity_code_suffix(), node=self.node,
user=user, readable_name=name)
@register_operation
class ResetNodeOperation(NodeOperation):
activity_code_suffix = 'reset'
id = 'reset'
name = _("reset")
description = _("Disable missing node and redeploy all instances "
......@@ -918,7 +1010,6 @@ class ResetNodeOperation(NodeOperation):
@register_operation
class FlushOperation(NodeOperation):
activity_code_suffix = 'flush'
id = 'flush'
name = _("flush")
description = _("Passivate node and move all instances to other ones.")
......@@ -939,7 +1030,6 @@ class FlushOperation(NodeOperation):
@register_operation
class ActivateOperation(NodeOperation):
activity_code_suffix = 'activate'
id = 'activate'
name = _("activate")
description = _("Make node active, i.e. scheduler is allowed to deploy "
......@@ -960,7 +1050,6 @@ class ActivateOperation(NodeOperation):
@register_operation
class PassivateOperation(NodeOperation):
activity_code_suffix = 'passivate'
id = 'passivate'
name = _("passivate")
description = _("Make node passive, i.e. scheduler is denied to deploy "
......@@ -982,7 +1071,6 @@ class PassivateOperation(NodeOperation):
@register_operation
class DisableOperation(NodeOperation):
activity_code_suffix = 'disable'
id = 'disable'
name = _("disable")
description = _("Disable node.")
......@@ -1006,8 +1094,7 @@ class DisableOperation(NodeOperation):
@register_operation
class ScreenshotOperation(InstanceOperation):
activity_code_suffix = 'screenshot'
class ScreenshotOperation(RemoteInstanceOperation):
id = 'screenshot'
name = _("screenshot")
description = _("Get a screenshot about the virtual machine's console. A "
......@@ -1016,14 +1103,11 @@ class ScreenshotOperation(InstanceOperation):
acl_level = "owner"
required_perms = ()
accept_states = ('RUNNING', )
def _operation(self):
return self.instance.get_screenshot(timeout=20)
task = vm_tasks.screenshot
@register_operation
class RecoverOperation(InstanceOperation):
activity_code_suffix = 'recover'
id = 'recover'
name = _("recover")
description = _("Try to recover virtual machine disks from destroyed "
......@@ -1051,7 +1135,6 @@ class RecoverOperation(InstanceOperation):
@register_operation
class ResourcesOperation(InstanceOperation):
activity_code_suffix = 'Resources change'
id = 'resources_change'
name = _("resources change")
description = _("Change resources of a stopped virtual machine.")
......@@ -1098,7 +1181,6 @@ class EnsureAgentMixin(object):
@register_operation
class PasswordResetOperation(EnsureAgentMixin, InstanceOperation):
activity_code_suffix = 'password_reset'
id = 'password_reset'
name = _("password reset")
description = _("Generate and set a new login password on the virtual "
......@@ -1119,7 +1201,6 @@ class PasswordResetOperation(EnsureAgentMixin, InstanceOperation):
@register_operation
class MountStoreOperation(EnsureAgentMixin, InstanceOperation):
activity_code_suffix = 'mount_store'
id = 'mount_store'
name = _("mount store")
description = _(
......@@ -1144,3 +1225,61 @@ class MountStoreOperation(EnsureAgentMixin, InstanceOperation):
password = user.profile.smb_password
agent_tasks.mount_store.apply_async(
queue=queue, args=(inst.vm_name, host, username, password))
class AbstractDiskOperation(SubOperationMixin, RemoteInstanceOperation):
required_perms = ()
def _get_remote_args(self, disk, **kwargs):
return (super(AbstractDiskOperation, self)._get_remote_args(**kwargs)
+ [disk.get_vmdisk_desc()])
@register_operation
class AttachDisk(AbstractDiskOperation):
id = "_attach_disk"
name = _("attach disk")
task = vm_tasks.attach_disk
class DetachMixin(object):
def _operation(self, activity, **kwargs):
try:
super(DetachMixin, self)._operation(**kwargs)
except Exception as e:
if hasattr(e, "libvirtError") and "not found" in unicode(e):
activity.result = create_readable(
ugettext_noop("Resource was not found."),
ugettext_noop("Resource was not found. %(exception)s"),
exception=unicode(e))
else:
raise
@register_operation
class DetachDisk(DetachMixin, AbstractDiskOperation):
id = "_detach_disk"
name = _("detach disk")
task = vm_tasks.detach_disk
class AbstractNetworkOperation(SubOperationMixin, RemoteInstanceOperation):
required_perms = ()
def _get_remote_args(self, interface, **kwargs):
return (super(AbstractNetworkOperation, self)
._get_remote_args(**kwargs) + [interface.get_vmnetwork_desc()])
@register_operation
class AttachNetwork(AbstractNetworkOperation):
id = "_attach_network"
name = _("attach network")
task = vm_tasks.attach_network
@register_operation
class DetachNetwork(DetachMixin, AbstractNetworkOperation):
id = "_detach_network"
name = _("detach network")
task = vm_tasks.detach_network
......@@ -29,7 +29,8 @@ from ..models import (
)
from ..models.instance import find_unused_port, ActivityInProgressError
from ..operations import (
DeployOperation, DestroyOperation, FlushOperation, MigrateOperation,
RemoteOperationMixin, DeployOperation, DestroyOperation, FlushOperation,
MigrateOperation,
)
......@@ -89,7 +90,7 @@ class InstanceTestCase(TestCase):
self.assertFalse(inst.save.called)
def test_destroy_sets_destroyed(self):
inst = Mock(destroyed_at=None, spec=Instance,
inst = Mock(destroyed_at=None, spec=Instance, _delete_vm=Mock(),
InstanceDestroyedError=Instance.InstanceDestroyedError)
inst.node = MagicMock(spec=Node)
inst.disks.all.return_value = []
......@@ -105,7 +106,8 @@ class InstanceTestCase(TestCase):
inst.node = MagicMock(spec=Node)
inst.status = 'RUNNING'
migrate_op = MigrateOperation(inst)
with patch('vm.models.instance.vm_tasks.migrate') as migr:
with patch('vm.operations.vm_tasks.migrate') as migr, \
patch.object(RemoteOperationMixin, "_operation"):
act = MagicMock()
with patch.object(MigrateOperation, 'create_activity',
return_value=act):
......@@ -121,7 +123,8 @@ class InstanceTestCase(TestCase):
inst.node = MagicMock(spec=Node)
inst.status = 'RUNNING'
migrate_op = MigrateOperation(inst)
with patch('vm.models.instance.vm_tasks.migrate') as migr:
with patch('vm.operations.vm_tasks.migrate') as migr, \
patch.object(RemoteOperationMixin, "_operation"):
inst.select_node.side_effect = AssertionError
act = MagicMock()
with patch.object(MigrateOperation, 'create_activity',
......@@ -138,19 +141,22 @@ class InstanceTestCase(TestCase):
inst.status = 'RUNNING'
e = Exception('abc')
setattr(e, 'libvirtError', '')
inst.migrate_vm.side_effect = e
migrate_op = MigrateOperation(inst)
with patch('vm.models.instance.vm_tasks.migrate') as migr:
migrate_op.rollback = Mock()
with patch('vm.operations.vm_tasks.migrate') as migr, \
patch.object(RemoteOperationMixin, '_operation') as remop:
act = MagicMock()
remop.side_effect = e
with patch.object(MigrateOperation, 'create_activity',
return_value=act):
self.assertRaises(Exception, migrate_op, system=True)
remop.assert_called()
migr.apply_async.assert_called()
self.assertIn(call.sub_activity(
u'rollback_net', readable_name=u'redeploy network (rollback)'),
act.mock_calls)
inst.allocate_node.assert_called()
u'scheduling', readable_name=u'schedule'), act.mock_calls)
migrate_op.rollback.assert_called()
inst.select_node.assert_called()
def test_status_icon(self):
inst = MagicMock(spec=Instance)
......
......@@ -52,15 +52,14 @@ class MigrateOperationTestCase(TestCase):
inst = MagicMock(spec=Instance)
act = MagicMock(spec=InstanceActivity)
inst.migrate_vm = MagicMock(side_effect=MigrateException())
op = MigrateOperation(inst)
op._get_remote_args = MagicMock(side_effect=MigrateException())
inst.select_node = MagicMock(return_value='test')
inst.reallocate_node = (
lambda act: Instance.reallocate_node(inst, act))
self.assertRaises(
MigrateException, MigrateOperation(inst)._operation,
MigrateException, op._operation,
act, to_node=None)
assert inst.select_node.called
inst.migrate_vm.assert_called_once_with(to_node='test', timeout=120)
op._get_remote_args.assert_called_once_with(to_node='test')
class RebootOperationTestCase(TestCase):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment