Commit 6636656e by Dudás Ádám

vm: refactor operations

parent ebfd1921
...@@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals ...@@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals
from datetime import timedelta from datetime import timedelta
from importlib import import_module from importlib import import_module
from logging import getLogger from logging import getLogger
from string import ascii_lowercase
from warnings import warn from warnings import warn
import django.conf import django.conf
...@@ -726,65 +727,48 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -726,65 +727,48 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
""" """
return scheduler.select_node(self, Node.objects.all()) return scheduler.select_node(self, Node.objects.all())
def _schedule_vm(self, act): def deploy_disks(self):
"""Schedule the virtual machine as part of a higher level activity. """Deploy all associated disks.
:param act: Parent activity.
""" """
# Find unused port for VNC devnums = list(ascii_lowercase) # a-z
if self.vnc_port is None: for disk in self.disks.all():
self.vnc_port = find_unused_vnc_port() # assign device numbers
if disk.dev_num in devnums:
# Schedule devnums.remove(disk.dev_num)
if self.node is None: else:
self.node = self.select_node() disk.dev_num = devnums.pop(0)
disk.save()
self.save() # deploy disk
disk.deploy()
def _deploy_vm(self, act, timeout=15):
"""Deploy the virtual machine.
:param self: The virtual machine.
:param act: Parent activity. def destroy_disks(self):
"""Destroy all associated disks.
""" """
queue_name = self.get_remote_queue_name('vm') for disk in self.disks.all():
disk.destroy()
# Deploy VM on remote machine def deploy_net(self):
with act.sub_activity('deploying_vm') as deploy_act: """Deploy all associated network interfaces.
deploy_act.result = vm_tasks.deploy.apply_async( """
args=[self.get_vm_desc()],
queue=queue_name).get(timeout=timeout)
# Estabilish network connection (vmdriver)
with act.sub_activity('deploying_net'):
for net in self.interface_set.all(): for net in self.interface_set.all():
net.deploy() net.deploy()
# Resume vm def destroy_net(self):
with act.sub_activity('booting'): """Destroy all associated network interfaces.
vm_tasks.resume.apply_async(args=[self.vm_name],
queue=queue_name).get(timeout=timeout)
self.renew(which='both', base_activity=act)
def _destroy_vm(self, act, timeout=15):
"""Destroy the virtual machine and its associated networks.
:param self: The virtual machine.
:param act: Parent activity.
""" """
# Destroy networks
with act.sub_activity('destroying_net'):
for net in self.interface_set.all(): for net in self.interface_set.all():
net.destroy() net.destroy()
# Destroy virtual machine def shutdown_net(self):
with act.sub_activity('destroying_vm'): """Shutdown all associated network interfaces.
"""
for net in self.interface_set.all():
net.shutdown()
def delete_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm')
try: try:
vm_tasks.destroy.apply_async(args=[self.vm_name], return vm_tasks.destroy.apply_async(args=[self.vm_name],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
except Exception as e: except Exception as e:
...@@ -794,26 +778,85 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -794,26 +778,85 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
else: else:
raise raise
def _cleanup_after_destroy_vm(self, act, timeout=15): def deploy_vm(self, timeout=15):
"""Clean up the virtual machine's data after destroy. queue_name = self.get_remote_queue_name('vm')
return vm_tasks.deploy.apply_async(args=[self.get_vm_desc()],
queue=queue_name
).get(timeout=timeout)
:param self: The virtual machine. def migrate_vm(self, to_node, timeout=120):
queue_name = self.get_remote_queue_name('vm')
return vm_tasks.migrate.apply_async(args=[self.vm_name,
to_node.host.hostname],
queue=queue_name
).get(timeout=timeout)
:param act: Parent activity. def reboot_vm(self, timeout=5):
""" queue_name = self.get_remote_queue_name('vm')
# Delete mem. dump if exists return vm_tasks.reboot.apply_async(args=[self.vm_name],
try: queue=queue_name
).get(timeout=timeout)
def reset_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm')
return vm_tasks.reset.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def resume_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm')
return vm_tasks.resume.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def shutdown_vm(self, timeout=120):
queue_name = self.get_remote_queue_name('vm')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name,
self.vm_name)
return vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name},
queue=queue_name
).get(timeout=timeout)
def suspend_vm(self, timeout=60):
queue_name = self.get_remote_queue_name('vm')
return vm_tasks.sleep.apply_async(args=[self.vm_name,
self.mem_dump['path']],
queue=queue_name
).get(timeout=timeout)
def wake_up_vm(self, timeout=60):
queue_name = self.get_remote_queue_name('vm')
return vm_tasks.wake_up.apply_async(args=[self.vm_name,
self.mem_dump['path']],
queue=queue_name
).get(timeout=timeout)
def delete_mem_dump(self, timeout=15):
queue_name = self.mem_dump['datastore'].get_remote_queue_name( queue_name = self.mem_dump['datastore'].get_remote_queue_name(
'storage') 'storage')
from storage.tasks.remote_tasks import delete_dump from storage.tasks.remote_tasks import delete_dump
delete_dump.apply_async(args=[self.mem_dump['path']], delete_dump.apply_async(args=[self.mem_dump['path']],
queue=queue_name).get(timeout=timeout) queue=queue_name).get(timeout=timeout)
except:
pass
# Clear node and VNC port association def allocate_node(self):
if self.node is None:
self.node = self.select_node()
self.save()
def yield_node(self):
if self.node is not None:
self.node = None self.node = None
self.save()
def allocate_vnc_port(self):
if self.vnc_port is None:
self.vnc_port = find_unused_vnc_port()
self.save()
def yield_vnc_port(self):
if self.vnc_port is not None:
self.vnc_port = None self.vnc_port = None
self.save()
def get_status_icon(self): def get_status_icon(self):
return { return {
......
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from logging import getLogger from logging import getLogger
from string import ascii_lowercase
from django.core.exceptions import PermissionDenied from django.core.exceptions import PermissionDenied
from django.utils import timezone from django.utils import timezone
...@@ -10,7 +9,6 @@ from celery.exceptions import TimeLimitExceeded ...@@ -10,7 +9,6 @@ from celery.exceptions import TimeLimitExceeded
from common.operations import Operation, register_operation from common.operations import Operation, register_operation
from storage.models import Disk from storage.models import Disk
from .tasks import vm_tasks
from .tasks.local_tasks import async_instance_operation, async_node_operation from .tasks.local_tasks import async_instance_operation, async_node_operation
from .models import ( from .models import (
Instance, InstanceActivity, InstanceTemplate, Node, NodeActivity, Instance, InstanceActivity, InstanceTemplate, Node, NodeActivity,
...@@ -70,23 +68,28 @@ class DeployOperation(InstanceOperation): ...@@ -70,23 +68,28 @@ class DeployOperation(InstanceOperation):
def on_commit(self, activity): def on_commit(self, activity):
activity.resultant_state = 'RUNNING' activity.resultant_state = 'RUNNING'
def _operation(self, activity, user, system): def _operation(self, activity, user, system, timeout=15):
self.instance._schedule_vm(activity) # Allocate VNC port and host node
self.instance.allocate_vnc_port()
self.instance.allocate_node()
# Deploy virtual images # Deploy virtual images
with activity.sub_activity('deploying_disks'): with activity.sub_activity('deploying_disks'):
devnums = list(ascii_lowercase) # a-z self.instance.deploy_disks()
for disk in self.instance.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
self.instance._deploy_vm(activity) # Deploy VM on remote machine
with activity.sub_activity('deploying_vm') as deploy_act:
deploy_act.result = self.instance.deploy_vm(timeout=timeout)
# Establish network connection (vmdriver)
with activity.sub_activity('deploying_net'):
self.instance.deploy_net()
# Resume vm
with activity.sub_activity('booting'):
self.instance.resume_vm(timeout=timeout)
self.instance.renew(which='both', base_activity=activity)
register_instance_operation(DeployOperation) register_instance_operation(DeployOperation)
...@@ -103,14 +106,27 @@ class DestroyOperation(InstanceOperation): ...@@ -103,14 +106,27 @@ class DestroyOperation(InstanceOperation):
def _operation(self, activity, user, system): def _operation(self, activity, user, system):
if self.instance.node: if self.instance.node:
self.instance._destroy_vm(activity) # Destroy networks
with activity.sub_activity('destroying_net'):
self.instance.destroy_net()
# Delete virtual machine
with activity.sub_activity('destroying_vm'):
self.instance.delete_vm()
# Destroy disks # Destroy disks
with activity.sub_activity('destroying_disks'): with activity.sub_activity('destroying_disks'):
for disk in self.instance.disks.all(): self.instance.destroy_disks()
disk.destroy()
self.instance._cleanup_after_destroy_vm(activity) # Delete mem. dump if exists
try:
self.instance.delete_mem_dump()
except:
pass
# Clear node and VNC port association
self.instance.yield_node()
self.instance.yield_vnc_port()
self.instance.destroyed_at = timezone.now() self.instance.destroyed_at = timezone.now()
self.instance.save() self.instance.save()
...@@ -131,23 +147,19 @@ class MigrateOperation(InstanceOperation): ...@@ -131,23 +147,19 @@ class MigrateOperation(InstanceOperation):
to_node = self.instance.select_node() to_node = self.instance.select_node()
sa.result = to_node sa.result = to_node
# Destroy networks # Shutdown networks
with activity.sub_activity('destroying_net'): with activity.sub_activity('shutdown_net'):
for net in self.instance.interface_set.all(): self.instance.shutdown_net()
net.shutdown()
with activity.sub_activity('migrate_vm'): with activity.sub_activity('migrate_vm'):
queue_name = self.instance.get_remote_queue_name('vm') self.instance.migrate_vm(to_node=to_node, timeout=timeout)
vm_tasks.migrate.apply_async(args=[self.instance.vm_name,
to_node.host.hostname],
queue=queue_name).get(timeout=timeout)
# Refresh node information # Refresh node information
self.instance.node = to_node self.instance.node = to_node
self.instance.save() self.instance.save()
# Estabilish network connection (vmdriver) # Estabilish network connection (vmdriver)
with activity.sub_activity('deploying_net'): with activity.sub_activity('deploying_net'):
for net in self.instance.interface_set.all(): self.instance.deploy_net()
net.deploy()
register_instance_operation(MigrateOperation) register_instance_operation(MigrateOperation)
...@@ -160,9 +172,7 @@ class RebootOperation(InstanceOperation): ...@@ -160,9 +172,7 @@ class RebootOperation(InstanceOperation):
description = _("Reboot virtual machine with Ctrl+Alt+Del signal.") description = _("Reboot virtual machine with Ctrl+Alt+Del signal.")
def _operation(self, activity, user, system, timeout=5): def _operation(self, activity, user, system, timeout=5):
queue_name = self.instance.get_remote_queue_name('vm') self.instance.reboot_vm(timeout=timeout)
vm_tasks.reboot.apply_async(args=[self.instance.vm_name],
queue=queue_name).get(timeout=timeout)
register_instance_operation(RebootOperation) register_instance_operation(RebootOperation)
...@@ -175,10 +185,7 @@ class ResetOperation(InstanceOperation): ...@@ -175,10 +185,7 @@ class ResetOperation(InstanceOperation):
description = _("Reset virtual machine (reset button).") description = _("Reset virtual machine (reset button).")
def _operation(self, activity, user, system, timeout=5): def _operation(self, activity, user, system, timeout=5):
queue_name = self.instance.get_remote_queue_name('vm') self.instance.reset_vm(timeout=timeout)
vm_tasks.reset.apply_async(args=[self.instance.vm_name],
queue=queue_name).get(timeout=timeout)
register_instance_operation(ResetOperation) register_instance_operation(ResetOperation)
...@@ -219,7 +226,7 @@ class SaveAsTemplateOperation(InstanceOperation): ...@@ -219,7 +226,7 @@ class SaveAsTemplateOperation(InstanceOperation):
def __try_save_disk(disk): def __try_save_disk(disk):
try: try:
return disk.save_as() # can do in parallel return disk.save_as()
except Disk.WrongDiskTypeError: except Disk.WrongDiskTypeError:
return disk return disk
...@@ -260,14 +267,9 @@ class ShutdownOperation(InstanceOperation): ...@@ -260,14 +267,9 @@ class ShutdownOperation(InstanceOperation):
activity.resultant_state = 'STOPPED' activity.resultant_state = 'STOPPED'
def _operation(self, activity, user, system, timeout=120): def _operation(self, activity, user, system, timeout=120):
queue_name = self.instance.get_remote_queue_name('vm') self.instance.shutdown_vm(timeout=timeout)
logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name, self.instance.yield_node()
self.instance.vm_name) self.instance.yield_vnc_port()
vm_tasks.shutdown.apply_async(kwargs={'name': self.instance.vm_name},
queue=queue_name).get(timeout=timeout)
self.instance.node = None
self.instance.vnc_port = None
self.instance.save()
register_instance_operation(ShutdownOperation) register_instance_operation(ShutdownOperation)
...@@ -283,12 +285,17 @@ class ShutOffOperation(InstanceOperation): ...@@ -283,12 +285,17 @@ class ShutOffOperation(InstanceOperation):
activity.resultant_state = 'STOPPED' activity.resultant_state = 'STOPPED'
def _operation(self, activity, user, system): def _operation(self, activity, user, system):
# Destroy VM # Shutdown networks
if self.instance.node: with activity.sub_activity('shutdown_net'):
self.instance._destroy_vm(activity) self.instance.shutdown_net()
self.instance._cleanup_after_destroy_vm(activity) # Delete virtual machine
self.instance.save() with activity.sub_activity('delete_vm'):
self.instance.delete_vm()
# Clear node and VNC port association
self.instance.yield_node()
self.instance.yield_vnc_port()
register_instance_operation(ShutOffOperation) register_instance_operation(ShutOffOperation)
...@@ -316,18 +323,15 @@ class SleepOperation(InstanceOperation): ...@@ -316,18 +323,15 @@ class SleepOperation(InstanceOperation):
def _operation(self, activity, user, system, timeout=60): def _operation(self, activity, user, system, timeout=60):
# Destroy networks # Destroy networks
with activity.sub_activity('destroying_net'): with activity.sub_activity('shutdown_net'):
for net in self.instance.interface_set.all(): self.instance.shutdown_net()
net.shutdown()
# Suspend vm # Suspend vm
with activity.sub_activity('suspending'): with activity.sub_activity('suspending'):
queue_name = self.instance.get_remote_queue_name('vm') self.instance.suspend_vm(timeout=timeout)
vm_tasks.sleep.apply_async(args=[self.instance.vm_name,
self.instance.mem_dump['path']], self.instance.yield_node()
queue=queue_name).get(timeout=timeout) # VNC port needs to be kept
self.instance.node = None
self.instance.save()
register_instance_operation(SleepOperation) register_instance_operation(SleepOperation)
...@@ -355,19 +359,16 @@ class WakeUpOperation(InstanceOperation): ...@@ -355,19 +359,16 @@ class WakeUpOperation(InstanceOperation):
def _operation(self, activity, user, system, timeout=60): def _operation(self, activity, user, system, timeout=60):
# Schedule vm # Schedule vm
self.instance._schedule_vm(activity) self.instance.allocate_vnc_port()
queue_name = self.instance.get_remote_queue_name('vm') self.instance.allocate_node()
# Resume vm # Resume vm
with activity.sub_activity('resuming'): with activity.sub_activity('resuming'):
vm_tasks.wake_up.apply_async(args=[self.instance.vm_name, self.instance.wake_up_vm(timeout=timeout)
self.instance.mem_dump['path']],
queue=queue_name).get(timeout=timeout)
# Estabilish network connection (vmdriver) # Estabilish network connection (vmdriver)
with activity.sub_activity('deploying_net'): with activity.sub_activity('deploying_net'):
for net in self.instance.interface_set.all(): self.instance.deploy_net()
net.deploy()
# Renew vm # Renew vm
self.instance.renew(which='both', base_activity=activity) self.instance.renew(which='both', base_activity=activity)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment