Commit 95199687 by Őry Máté

vm: first batch of moving Instance methods to SubOperations

parent afa5deac
...@@ -20,7 +20,6 @@ from datetime import timedelta ...@@ -20,7 +20,6 @@ from datetime import timedelta
from functools import partial from functools import partial
from importlib import import_module from importlib import import_module
from logging import getLogger from logging import getLogger
from string import ascii_lowercase
from warnings import warn from warnings import warn
from celery.exceptions import TimeoutError from celery.exceptions import TimeoutError
...@@ -738,74 +737,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -738,74 +737,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
""" """
return scheduler.select_node(self, Node.objects.all()) return scheduler.select_node(self, Node.objects.all())
def attach_disk(self, disk, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.attach_disk.apply_async(
args=[self.vm_name,
disk.get_vmdisk_desc()],
queue=queue_name
).get(timeout=timeout)
def detach_disk(self, disk, timeout=15):
try:
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.detach_disk.apply_async(
args=[self.vm_name,
disk.get_vmdisk_desc()],
queue=queue_name
).get(timeout=timeout)
except Exception as e:
if e.libvirtError and "not found" in str(e):
logger.debug("Disk %s was not found."
% disk.name)
else:
raise
def attach_network(self, network, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.attach_network.apply_async(
args=[self.vm_name,
network.get_vmnetwork_desc()],
queue=queue_name
).get(timeout=timeout)
def detach_network(self, network, timeout=15):
try:
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.detach_network.apply_async(
args=[self.vm_name,
network.get_vmnetwork_desc()],
queue=queue_name
).get(timeout=timeout)
except Exception as e:
if e.libvirtError and "not found" in str(e):
logger.debug("Interface %s was not found."
% (network.__unicode__()))
else:
raise
def resize_disk_live(self, disk, size, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.resize_disk.apply_async(
args=[self.vm_name, disk.path, size],
queue=queue_name).get(timeout=timeout)
disk.size = size
disk.save()
def deploy_disks(self):
"""Deploy all associated disks.
"""
devnums = list(ascii_lowercase) # a-z
for disk in self.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
def destroy_disks(self): def destroy_disks(self):
"""Destroy all associated disks. """Destroy all associated disks.
""" """
...@@ -830,19 +761,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -830,19 +761,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
for net in self.interface_set.all(): for net in self.interface_set.all():
net.shutdown() net.shutdown()
def delete_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'fast')
try:
return vm_tasks.destroy.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
except Exception as e:
if e.libvirtError and "Domain not found" in str(e):
logger.debug("Domain %s was not found at %s"
% (self.vm_name, queue_name))
else:
raise
def deploy_vm(self, timeout=15): def deploy_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'slow') queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.deploy.apply_async(args=[self.get_vm_desc()], return vm_tasks.deploy.apply_async(args=[self.get_vm_desc()],
...@@ -857,24 +775,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -857,24 +775,6 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def reboot_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.reboot.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def reset_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.reset.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def resume_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.resume.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def shutdown_vm(self, task=None, step=5): def shutdown_vm(self, task=None, step=5):
queue_name = self.get_remote_queue_name('vm', 'slow') queue_name = self.get_remote_queue_name('vm', 'slow')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name, logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name,
......
...@@ -33,7 +33,7 @@ from celery.exceptions import TimeLimitExceeded ...@@ -33,7 +33,7 @@ from celery.exceptions import TimeLimitExceeded
from common.models import ( from common.models import (
create_readable, humanize_exception, HumanReadableException create_readable, humanize_exception, HumanReadableException
) )
from common.operations import Operation, register_operation from common.operations import Operation, register_operation, SubOperationMixin
from manager.scheduler import SchedulerError from manager.scheduler import SchedulerError
from .tasks.local_tasks import ( from .tasks.local_tasks import (
abortable_async_instance_operation, abortable_async_node_operation, abortable_async_instance_operation, abortable_async_node_operation,
...@@ -218,11 +218,7 @@ class CreateDiskOperation(InstanceOperation): ...@@ -218,11 +218,7 @@ class CreateDiskOperation(InstanceOperation):
readable_name=ugettext_noop("deploying disk") readable_name=ugettext_noop("deploying disk")
): ):
disk.deploy() disk.deploy()
with activity.sub_activity( self.instance._attach_disk(parent_activity=activity, disk=disk)
'attach_disk',
readable_name=ugettext_noop("attach disk")
):
self.instance.attach_disk(disk)
def get_activity_name(self, kwargs): def get_activity_name(self, kwargs):
return create_readable( return create_readable(
...@@ -231,7 +227,7 @@ class CreateDiskOperation(InstanceOperation): ...@@ -231,7 +227,7 @@ class CreateDiskOperation(InstanceOperation):
@register_operation @register_operation
class ResizeDiskOperation(InstanceOperation): class ResizeDiskOperation(RemoteInstanceOperation):
id = 'resize_disk' id = 'resize_disk'
name = _("resize disk") name = _("resize disk")
...@@ -240,9 +236,12 @@ class ResizeDiskOperation(InstanceOperation): ...@@ -240,9 +236,12 @@ class ResizeDiskOperation(InstanceOperation):
required_perms = ('storage.resize_disk', ) required_perms = ('storage.resize_disk', )
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
async_queue = "localhost.man.slow" async_queue = "localhost.man.slow"
remote_queue = ('vm', 'slow')
task = vm_tasks.resize_disk
def _operation(self, user, disk, size, activity): def _get_remote_args(self, disk, size, **kwargs):
self.instance.resize_disk_live(disk, size) return (super(ResizeDiskOperation, self)
._get_remote_args(**kwargs) + [disk.path, size])
def get_activity_name(self, kwargs): def get_activity_name(self, kwargs):
return create_readable( return create_readable(
...@@ -281,11 +280,7 @@ class DownloadDiskOperation(InstanceOperation): ...@@ -281,11 +280,7 @@ class DownloadDiskOperation(InstanceOperation):
# TODO iso (cd) hot-plug is not supported by kvm/guests # TODO iso (cd) hot-plug is not supported by kvm/guests
if self.instance.is_running and disk.type not in ["iso"]: if self.instance.is_running and disk.type not in ["iso"]:
with activity.sub_activity( self.instance._attach_disk(parent_activity=activity, disk=disk)
'attach_disk',
readable_name=ugettext_noop("attach disk")
):
self.instance.attach_disk(disk)
@register_operation @register_operation
...@@ -319,10 +314,7 @@ class DeployOperation(InstanceOperation): ...@@ -319,10 +314,7 @@ class DeployOperation(InstanceOperation):
self.instance.allocate_node() self.instance.allocate_node()
# Deploy virtual images # Deploy virtual images
with activity.sub_activity( self.instance._deploy_disks(parent_activity=activity)
'deploying_disks', readable_name=ugettext_noop(
"deploy disks")):
self.instance.deploy_disks()
# Deploy VM on remote machine # Deploy VM on remote machine
if self.instance.state not in ['PAUSED']: if self.instance.state not in ['PAUSED']:
...@@ -344,16 +336,36 @@ class DeployOperation(InstanceOperation): ...@@ -344,16 +336,36 @@ class DeployOperation(InstanceOperation):
except: except:
pass pass
# Resume vm self.instance._resume_vm(parent_activity=activity)
with activity.sub_activity(
'booting', readable_name=ugettext_noop(
"boot virtual machine")):
self.instance.resume_vm(timeout=timeout)
if self.instance.has_agent: if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop( activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True) "wait operating system loading"), interruptible=True)
@register_operation
class DeployDisksOperation(SubOperationMixin, InstanceOperation):
id = "_deploy_disks"
name = _("deploy disks")
description = _("Deploy all associated disks.")
def _operation(self):
devnums = list(ascii_lowercase) # a-z
for disk in self.instance.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
class ResumeVmOperation(SubOperationMixin, InstanceOperation):
id = "_resume_vm"
name = _("boot virtual machine")
remote_queue = ("vm", "slow")
task = vm_tasks.resume
@register_operation @register_operation
class DestroyOperation(InstanceOperation): class DestroyOperation(InstanceOperation):
...@@ -374,11 +386,7 @@ class DestroyOperation(InstanceOperation): ...@@ -374,11 +386,7 @@ class DestroyOperation(InstanceOperation):
self.instance.destroy_net() self.instance.destroy_net()
if self.instance.node: if self.instance.node:
# Delete virtual machine self.instance._delete_vm(parent_activity=activity)
with activity.sub_activity(
'destroying_vm',
readable_name=ugettext_noop("destroy virtual machine")):
self.instance.delete_vm()
# Destroy disks # Destroy disks
with activity.sub_activity( with activity.sub_activity(
...@@ -459,16 +467,17 @@ class MigrateOperation(InstanceOperation): ...@@ -459,16 +467,17 @@ class MigrateOperation(InstanceOperation):
@register_operation @register_operation
class RebootOperation(InstanceOperation): class RebootOperation(RemoteInstanceOperation):
id = 'reboot' id = 'reboot'
name = _("reboot") name = _("reboot")
description = _("Warm reboot virtual machine by sending Ctrl+Alt+Del " description = _("Warm reboot virtual machine by sending Ctrl+Alt+Del "
"signal to its console.") "signal to its console.")
required_perms = () required_perms = ()
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
task = vm_tasks.reboot
def _operation(self, activity, timeout=5): def _operation(self, activity):
self.instance.reboot_vm(timeout=timeout) super(RebootOperation, self)._operation()
if self.instance.has_agent: if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop( activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True) "wait operating system loading"), interruptible=True)
...@@ -486,11 +495,8 @@ class RemoveInterfaceOperation(InstanceOperation): ...@@ -486,11 +495,8 @@ class RemoveInterfaceOperation(InstanceOperation):
def _operation(self, activity, user, system, interface): def _operation(self, activity, user, system, interface):
if self.instance.is_running: if self.instance.is_running:
with activity.sub_activity( self.instance._detach_network(interface=interface,
'detach_network', parent_activity=activity)
readable_name=ugettext_noop("detach network")
):
self.instance.detach_network(interface)
interface.shutdown() interface.shutdown()
interface.destroy() interface.destroy()
...@@ -512,11 +518,7 @@ class RemoveDiskOperation(InstanceOperation): ...@@ -512,11 +518,7 @@ class RemoveDiskOperation(InstanceOperation):
def _operation(self, activity, user, system, disk): def _operation(self, activity, user, system, disk):
if self.instance.is_running and disk.type not in ["iso"]: if self.instance.is_running and disk.type not in ["iso"]:
with activity.sub_activity( self.instance._detach_disk(disk=disk, parent_activity=activity)
'detach_disk',
readable_name=ugettext_noop('detach disk')
):
self.instance.detach_disk(disk)
with activity.sub_activity( with activity.sub_activity(
'destroy_disk', 'destroy_disk',
readable_name=ugettext_noop('destroy disk') readable_name=ugettext_noop('destroy disk')
...@@ -529,15 +531,16 @@ class RemoveDiskOperation(InstanceOperation): ...@@ -529,15 +531,16 @@ class RemoveDiskOperation(InstanceOperation):
@register_operation @register_operation
class ResetOperation(InstanceOperation): class ResetOperation(RemoteInstanceOperation):
id = 'reset' id = 'reset'
name = _("reset") name = _("reset")
description = _("Cold reboot virtual machine (power cycle).") description = _("Cold reboot virtual machine (power cycle).")
required_perms = () required_perms = ()
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
task = vm_tasks.reset
def _operation(self, activity, timeout=5): def _operation(self, activity):
self.instance.reset_vm(timeout=timeout) super(ResetOperation, self)._operation()
if self.instance.has_agent: if self.instance.has_agent:
activity.sub_activity('os_boot', readable_name=ugettext_noop( activity.sub_activity('os_boot', readable_name=ugettext_noop(
"wait operating system loading"), interruptible=True) "wait operating system loading"), interruptible=True)
...@@ -688,10 +691,7 @@ class ShutOffOperation(InstanceOperation): ...@@ -688,10 +691,7 @@ class ShutOffOperation(InstanceOperation):
with activity.sub_activity('shutdown_net'): with activity.sub_activity('shutdown_net'):
self.instance.shutdown_net() self.instance.shutdown_net()
# Delete virtual machine self.instance._delete_vm(parent_activity=activity)
with activity.sub_activity('delete_vm'):
self.instance.delete_vm()
self.instance.yield_node() self.instance.yield_node()
...@@ -1098,3 +1098,61 @@ class MountStoreOperation(EnsureAgentMixin, InstanceOperation): ...@@ -1098,3 +1098,61 @@ class MountStoreOperation(EnsureAgentMixin, InstanceOperation):
password = user.profile.smb_password password = user.profile.smb_password
agent_tasks.mount_store.apply_async( agent_tasks.mount_store.apply_async(
queue=queue, args=(inst.vm_name, host, username, password)) queue=queue, args=(inst.vm_name, host, username, password))
class AbstractDiskOperation(SubOperationMixin, RemoteInstanceOperation):
required_perms = ()
def _get_remote_args(self, disk, **kwargs):
return (super(AbstractDiskOperation, self)._get_remote_args(**kwargs)
+ [disk.get_vmdisk_desc()])
@register_operation
class AttachDisk(AbstractDiskOperation):
id = "_attach_disk"
name = _("attach disk")
task = vm_tasks.attach_disk
class DetachMixin(object):
def _operation(self, activity, **kwargs):
try:
super(DetachMixin, self)._operation(**kwargs)
except Exception as e:
if hasattr(e, "libvirtError") and "not found" in unicode(e):
activity.result = create_readable(
ugettext_noop("Resource was not found."),
ugettext_noop("Resource was not found. %(exception)s"),
exception=unicode(e))
else:
raise
@register_operation
class DetachDisk(DetachMixin, AbstractDiskOperation):
id = "_detach_disk"
name = _("detach disk")
task = vm_tasks.detach_disk
class AbstractNetworkOperation(SubOperationMixin, RemoteInstanceOperation):
required_perms = ()
def _get_remote_args(self, interface, **kwargs):
return (super(AbstractNetworkOperation, self)
._get_remote_args(**kwargs) + [interface.get_vmnetwork_desc()])
@register_operation
class AttachNetwork(AbstractNetworkOperation):
id = "_attach_network"
name = _("attach network")
task = vm_tasks.attach_network
@register_operation
class DetachNetwork(DetachMixin, AbstractNetworkOperation):
id = "_detach_network"
name = _("detach network")
task = vm_tasks.detach_network
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment