Commit 48dcff49 by Dudás Ádám

vm: create VM operations

parent 25356007
...@@ -728,7 +728,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -728,7 +728,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
""" """
return scheduler.select_node(self, Node.objects.all()) return scheduler.select_node(self, Node.objects.all())
def __schedule_vm(self, act): def _schedule_vm(self, act):
"""Schedule the virtual machine as part of a higher level activity. """Schedule the virtual machine as part of a higher level activity.
:param act: Parent activity. :param act: Parent activity.
...@@ -743,7 +743,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -743,7 +743,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
self.save() self.save()
def __deploy_vm(self, act, timeout=15): def _deploy_vm(self, act, timeout=15):
"""Deploy the virtual machine. """Deploy the virtual machine.
:param self: The virtual machine. :param self: The virtual machine.
...@@ -793,7 +793,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -793,7 +793,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
on_commit=__on_commit, task_uuid=task_uuid, on_commit=__on_commit, task_uuid=task_uuid,
user=user) as act: user=user) as act:
self.__schedule_vm(act) self._schedule_vm(act)
# Deploy virtual images # Deploy virtual images
with act.sub_activity('deploying_disks'): with act.sub_activity('deploying_disks'):
...@@ -808,7 +808,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -808,7 +808,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
# deploy disk # deploy disk
disk.deploy() disk.deploy()
self.__deploy_vm(act) self._deploy_vm(act)
def deploy_async(self, user=None): def deploy_async(self, user=None):
"""Execute deploy asynchronously. """Execute deploy asynchronously.
...@@ -818,7 +818,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -818,7 +818,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
return local_tasks.deploy.apply_async(args=[self, user], return local_tasks.deploy.apply_async(args=[self, user],
queue="localhost.man") queue="localhost.man")
def __destroy_vm(self, act, timeout=15): def _destroy_vm(self, act, timeout=15):
"""Destroy the virtual machine and its associated networks. """Destroy the virtual machine and its associated networks.
:param self: The virtual machine. :param self: The virtual machine.
...@@ -844,7 +844,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -844,7 +844,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
else: else:
raise raise
def __cleanup_after_destroy_vm(self, act, timeout=15): def _cleanup_after_destroy_vm(self, act, timeout=15):
"""Clean up the virtual machine's data after destroy. """Clean up the virtual machine's data after destroy.
:param self: The virtual machine. :param self: The virtual machine.
...@@ -881,14 +881,14 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -881,14 +881,14 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
task_uuid=task_uuid, user=user) as act: task_uuid=task_uuid, user=user) as act:
# Destroy VM # Destroy VM
if self.node: if self.node:
self.__destroy_vm(act) self._destroy_vm(act)
self.__cleanup_after_destroy_vm(act) self._cleanup_after_destroy_vm(act)
# Deploy VM # Deploy VM
self.__schedule_vm(act) self._schedule_vm(act)
self.__deploy_vm(act) self._deploy_vm(act)
def redeploy_async(self, user=None): def redeploy_async(self, user=None):
"""Execute redeploy asynchronously. """Execute redeploy asynchronously.
...@@ -907,9 +907,9 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -907,9 +907,9 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
on_commit=__on_commit) as act: on_commit=__on_commit) as act:
# Destroy VM # Destroy VM
if self.node: if self.node:
self.__destroy_vm(act) self._destroy_vm(act)
self.__cleanup_after_destroy_vm(act) self._cleanup_after_destroy_vm(act)
self.save() self.save()
def shut_off_async(self, user=None): def shut_off_async(self, user=None):
...@@ -942,14 +942,14 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -942,14 +942,14 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
user=user) as act: user=user) as act:
if self.node: if self.node:
self.__destroy_vm(act) self._destroy_vm(act)
# Destroy disks # Destroy disks
with act.sub_activity('destroying_disks'): with act.sub_activity('destroying_disks'):
for disk in self.disks.all(): for disk in self.disks.all():
disk.destroy() disk.destroy()
self.__cleanup_after_destroy_vm(act) self._cleanup_after_destroy_vm(act)
self.destroyed_at = timezone.now() self.destroyed_at = timezone.now()
self.save() self.save()
...@@ -1019,7 +1019,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -1019,7 +1019,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
task_uuid=task_uuid, user=user) as act: task_uuid=task_uuid, user=user) as act:
# Schedule vm # Schedule vm
self.__schedule_vm(act) self._schedule_vm(act)
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm')
# Resume vm # Resume vm
......
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from logging import getLogger
from string import ascii_lowercase
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from celery.exceptions import TimeLimitExceeded
from common.models import activity_context from common.models import activity_context
from storage.models import Disk
from ..tasks import vm_tasks
from ..tasks.local_tasks import async_operation from ..tasks.local_tasks import async_operation
from .activity import InstanceActivity from .activity import InstanceActivity
from .instance import Instance from .instance import Instance, InstanceTemplate
logger = getLogger(__name__)
class Operation: class Operation:
...@@ -87,3 +99,373 @@ def register_operation(op_cls, op_id=None): ...@@ -87,3 +99,373 @@ def register_operation(op_cls, op_id=None):
op_id = op_cls.id op_id = op_cls.id
Instance._ops[op_id] = lambda inst: op_cls(inst) Instance._ops[op_id] = lambda inst: op_cls(inst)
class DeployOperation(Operation):
activity_code_suffix = 'deploy'
id = 'deploy'
name = _("deploy")
description = _("""Deploy new virtual machine with network
:param self: The virtual machine to deploy.
:type self: vm.models.Instance
:param user: The user who's issuing the command.
:type user: django.contrib.auth.models.User
:param task_uuid: The task's UUID, if the command is being executed
asynchronously.
:type task_uuid: str
""")
def on_commit(self, activity):
activity.resultant_state = 'RUNNING'
def _operation(self, activity, user=None):
if self.instance.destroyed_at:
raise self.instance.InstanceDestroyedError(self.instance)
self.instance._schedule_vm(activity)
# Deploy virtual images
with activity.sub_activity('deploying_disks'):
devnums = list(ascii_lowercase) # a-z
for disk in self.instance.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
self.instance._deploy_vm(activity)
register_operation(DeployOperation)
class DestroyOperation(Operation):
activity_code_suffix = 'destroy'
id = 'destroy'
name = _("destroy")
description = _("""Remove virtual machine and its networks.
:param self: The virtual machine to destroy.
:type self: vm.models.Instance
:param user: The user who's issuing the command.
:type user: django.contrib.auth.models.User
:param task_uuid: The task's UUID, if the command is being executed
asynchronously.
:type task_uuid: str
""")
def check_precond(self):
if self.instance.destroyed_at:
raise self.instance.InstanceDestroyedError(self.instance)
def on_commit(self, activity):
activity.resultant_state = 'DESTROYED'
def _operation(self, activity, user=None):
if self.instance.node:
self.instance._destroy_vm(activity)
# Destroy disks
with activity.sub_activity('destroying_disks'):
for disk in self.instance.disks.all():
disk.destroy()
self.instance._cleanup_after_destroy_vm(activity)
self.instance.destroyed_at = timezone.now()
self.instance.save()
register_operation(DestroyOperation)
class MigrateOperation(Operation):
activity_code_suffix = 'migrate'
id = 'migrate'
name = _("migrate")
description = _("""Live migrate running vm to another node.""")
def _operation(self, activity, to_node=None, user=None, timeout=120):
if not to_node:
with activity.sub_activity('scheduling') as sa:
to_node = self.instance.select_node()
sa.result = to_node
# Destroy networks
with activity.sub_activity('destroying_net'):
for net in self.instance.interface_set.all():
net.shutdown()
with activity.sub_activity('migrate_vm'):
queue_name = self.instance.get_remote_queue_name('vm')
vm_tasks.migrate.apply_async(args=[self.instance.vm_name,
to_node.host.hostname],
queue=queue_name).get(timeout=timeout)
# Refresh node information
self.instance.node = to_node
self.instance.save()
# Estabilish network connection (vmdriver)
with activity.sub_activity('deploying_net'):
for net in self.instance.interface_set.all():
net.deploy()
register_operation(MigrateOperation)
class RebootOperation(Operation):
activity_code_suffix = 'reboot'
id = 'reboot'
name = _("reboot")
description = _("""Reboot virtual machine with Ctrl+Alt+Del signal.""")
def _operation(self, activity, user=None, timeout=5):
queue_name = self.instance.get_remote_queue_name('vm')
vm_tasks.reboot.apply_async(args=[self.instance.vm_name],
queue=queue_name).get(timeout=timeout)
register_operation(RebootOperation)
class RedeployOperation(Operation):
activity_code_suffix = 'redeploy'
id = 'redeploy'
name = _("redeploy")
description = _("""Redeploy virtual machine with network
:param self: The virtual machine to redeploy.
:param user: The user who's issuing the command.
:type user: django.contrib.auth.models.User
:param task_uuid: The task's UUID, if the command is being executed
asynchronously.
:type task_uuid: str
""")
def _operation(self, activity, user=None):
# Destroy VM
if self.instance.node:
self.instance._destroy_vm(activity)
self.instance._cleanup_after_destroy_vm(activity)
# Deploy VM
self.instance._schedule_vm(activity)
self.instance._deploy_vm(activity)
register_operation(RedeployOperation)
class ResetOperation(Operation):
activity_code_suffix = 'reset'
id = 'reset'
name = _("reset")
description = _("""Reset virtual machine (reset button)""")
def _operation(self, activity, user=None, timeout=5):
queue_name = self.instance.get_remote_queue_name('vm')
vm_tasks.reset.apply_async(args=[self.instance.vm_name],
queue=queue_name).get(timeout=timeout)
register_operation(ResetOperation)
class SaveAsTemplateOperation(Operation):
activity_code_suffix = 'save_as_template'
id = 'save_as_template'
name = _("save as template")
description = _("""Save Virtual Machine as a Template.
Template can be shared with groups and users.
Users can instantiate Virtual Machines from Templates.
""")
def _operation(self, activity, name, user=None, timeout=300, **kwargs):
# prepare parameters
params = {
'access_method': self.instance.access_method,
'arch': self.instance.arch,
'boot_menu': self.instance.boot_menu,
'description': self.instance.description,
'lease': self.instance.lease, # Can be problem in new VM
'max_ram_size': self.instance.max_ram_size,
'name': name,
'num_cores': self.instance.num_cores,
'owner': user,
'parent': self.instance.template, # Can be problem
'priority': self.instance.priority,
'ram_size': self.instance.ram_size,
'raw_data': self.instance.raw_data,
'system': self.instance.system,
}
params.update(kwargs)
def __try_save_disk(disk):
try:
return disk.save_as() # can do in parallel
except Disk.WrongDiskTypeError:
return disk
# create template and do additional setup
tmpl = InstanceTemplate(**params)
tmpl.full_clean() # Avoiding database errors.
tmpl.save()
try:
with activity.sub_activity('saving_disks'):
tmpl.disks.add(*[__try_save_disk(disk)
for disk in self.instance.disks.all()])
# create interface templates
for i in self.instance.interface_set.all():
i.save_as_template(tmpl)
except:
tmpl.delete()
raise
else:
return tmpl
register_operation(SaveAsTemplateOperation)
class ShutdownOperation(Operation):
activity_code_suffix = 'shutdown'
id = 'shutdown'
name = _("shutdown")
description = _("""Shutdown virtual machine with ACPI signal.""")
def on_abort(self, activity, error):
if isinstance(error, TimeLimitExceeded):
activity.resultant_state = None
else:
activity.resultant_state = 'ERROR'
def on_commit(self, activity):
activity.resultant_state = 'STOPPED'
def _operation(self, activity, user=None, timeout=120):
queue_name = self.instance.get_remote_queue_name('vm')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.",
self.instance.vm_name, queue_name) # TODO param order ok?
vm_tasks.shutdown.apply_async(kwargs={'name': self.instance.vm_name},
queue=queue_name).get(timeout=timeout)
self.instance.node = None
self.instance.vnc_port = None
self.instance.save()
register_operation(ShutdownOperation)
class ShutOffOperation(Operation):
activity_code_suffix = 'shut_off'
id = 'shut_off'
name = _("shut off")
description = _("""Shut off VM. (plug-out)""")
def on_commit(activity):
activity.resultant_state = 'STOPPED'
def _operation(self, activity, user=None):
# Destroy VM
if self.instance.node:
self.instance._destroy_vm(activity)
self.instance._cleanup_after_destroy_vm(activity)
self.instance.save()
register_operation(ShutOffOperation)
class SleepOperation(Operation):
activity_code_suffix = 'sleep'
id = 'sleep'
name = _("sleep")
description = _("""Suspend virtual machine with memory dump.""")
def check_precond(self):
if self.instance.status not in ['RUNNING']:
raise self.instance.WrongStateError(self.instance)
def on_abort(self, activity, error):
if isinstance(error, TimeLimitExceeded):
activity.resultant_state = None
else:
activity.resultant_state = 'ERROR'
def on_commit(self, activity):
activity.resultant_state = 'SUSPENDED'
def _operation(self, activity, user=None, timeout=60):
# Destroy networks
with activity.sub_activity('destroying_net'):
for net in self.instance.interface_set.all():
net.shutdown()
# Suspend vm
with activity.sub_activity('suspending'):
queue_name = self.instance.get_remote_queue_name('vm')
vm_tasks.sleep.apply_async(args=[self.instance.vm_name,
self.instance.mem_dump['path']],
queue=queue_name).get(timeout=timeout)
self.instance.node = None
self.instance.save()
register_operation(SleepOperation)
class WakeUpOperation(Operation):
activity_code_suffix = 'wake_up'
id = 'wake_up'
name = _("wake up")
description = _("""Wake up Virtual Machine from SUSPENDED state.
Power on Virtual Machine and load its memory from dump.
""")
def check_precond(self):
if self.instance.status not in ['SUSPENDED']:
raise self.instance.WrongStateError(self.instance)
def on_abort(self, activity, error):
activity.resultant_state = 'ERROR'
def on_commit(self, activity):
activity.resultant_state = 'RUNNING'
def _operation(self, activity, user=None, timeout=60):
# Schedule vm
self.instance._schedule_vm(activity)
queue_name = self.instance.get_remote_queue_name('vm')
# Resume vm
with activity.sub_activity('resuming'):
vm_tasks.wake_up.apply_async(args=[self.instance.vm_name,
self.instance.mem_dump['path']],
queue=queue_name).get(timeout=timeout)
# Estabilish network connection (vmdriver)
with activity.sub_activity('deploying_net'):
for net in self.instance.interface_set.all():
net.deploy()
# Renew vm
self.instance.renew(which='both', base_activity=activity)
register_operation(WakeUpOperation)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment