Commit b6ba0036 by Dudás Ádám

vm: use operations

parent 48dcff49
...@@ -16,11 +16,12 @@ from .instance import pre_state_changed ...@@ -16,11 +16,12 @@ from .instance import pre_state_changed
from .network import InterfaceTemplate from .network import InterfaceTemplate
from .network import Interface from .network import Interface
from .node import Node from .node import Node
from .operation import Operation
__all__ = [ __all__ = [
'InstanceActivity', 'InstanceActiveManager', 'BaseResourceConfigModel', 'InstanceActivity', 'InstanceActiveManager', 'BaseResourceConfigModel',
'NamedBaseResourceConfig', 'VirtualMachineDescModel', 'InstanceTemplate', 'NamedBaseResourceConfig', 'VirtualMachineDescModel', 'InstanceTemplate',
'Instance', 'instance_activity', 'post_state_changed', 'pre_state_changed', 'Instance', 'instance_activity', 'post_state_changed', 'pre_state_changed',
'InterfaceTemplate', 'Interface', 'Trait', 'Node', 'NodeActivity', 'Lease', 'InterfaceTemplate', 'Interface', 'Trait', 'Node', 'NodeActivity', 'Lease',
'node_activity', 'node_activity', 'Operation',
] ]
...@@ -3,7 +3,6 @@ from datetime import timedelta ...@@ -3,7 +3,6 @@ from datetime import timedelta
from logging import getLogger from logging import getLogger
from importlib import import_module from importlib import import_module
from warnings import warn from warnings import warn
import string
import django.conf import django.conf
from django.contrib.auth.models import User from django.contrib.auth.models import User
...@@ -16,14 +15,13 @@ from django.dispatch import Signal ...@@ -16,14 +15,13 @@ from django.dispatch import Signal
from django.utils import timezone from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from celery.exceptions import TimeLimitExceeded
from model_utils import Choices from model_utils import Choices
from model_utils.models import TimeStampedModel, StatusModel from model_utils.models import TimeStampedModel, StatusModel
from taggit.managers import TaggableManager from taggit.managers import TaggableManager
from acl.models import AclBase from acl.models import AclBase
from storage.models import Disk from storage.models import Disk
from ..tasks import local_tasks, vm_tasks, agent_tasks from ..tasks import vm_tasks, agent_tasks
from .activity import (ActivityInProgressError, instance_activity, from .activity import (ActivityInProgressError, instance_activity,
InstanceActivity) InstanceActivity)
from .common import BaseResourceConfigModel, Lease from .common import BaseResourceConfigModel, Lease
...@@ -256,6 +254,13 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -256,6 +254,13 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
self.instance = instance self.instance = instance
def __getattr__(self, name):
if name in self._ops:
return self._ops[name](self)
else:
raise AttributeError("%s object has no attribute '%s'" %
(self.__class__.__name__, name))
def __unicode__(self): def __unicode__(self):
parts = (self.name, "(" + str(self.id) + ")") parts = (self.name, "(" + str(self.id) + ")")
return " ".join(s for s in parts if s != "") return " ".join(s for s in parts if s != "")
...@@ -770,53 +775,13 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -770,53 +775,13 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
self.renew(which='both', base_activity=act) self.renew(which='both', base_activity=act)
def deploy(self, user=None, task_uuid=None):
"""Deploy new virtual machine with network
:param self: The virtual machine to deploy.
:type self: vm.models.Instance
:param user: The user who's issuing the command.
:type user: django.contrib.auth.models.User
:param task_uuid: The task's UUID, if the command is being executed
asynchronously.
:type task_uuid: str
"""
if self.destroyed_at:
raise self.InstanceDestroyedError(self)
def __on_commit(activity):
activity.resultant_state = 'RUNNING'
with instance_activity(code_suffix='deploy', instance=self,
on_commit=__on_commit, task_uuid=task_uuid,
user=user) as act:
self._schedule_vm(act)
# Deploy virtual images
with act.sub_activity('deploying_disks'):
devnums = list(string.ascii_lowercase) # a-z
for disk in self.disks.all():
# assign device numbers
if disk.dev_num in devnums:
devnums.remove(disk.dev_num)
else:
disk.dev_num = devnums.pop(0)
disk.save()
# deploy disk
disk.deploy()
self._deploy_vm(act)
def deploy_async(self, user=None): def deploy_async(self, user=None):
"""Execute deploy asynchronously. """Execute deploy asynchronously.
""" """
warn('Use self.deploy.async instead.', DeprecationWarning)
logger.debug('Calling async local_tasks.deploy(%s, %s)', logger.debug('Calling async local_tasks.deploy(%s, %s)',
unicode(self), unicode(user)) unicode(self), unicode(user))
return local_tasks.deploy.apply_async(args=[self, user], return self.deploy.async(user=user)
queue="localhost.man")
def _destroy_vm(self, act, timeout=15): def _destroy_vm(self, act, timeout=15):
"""Destroy the virtual machine and its associated networks. """Destroy the virtual machine and its associated networks.
...@@ -865,342 +830,68 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, ...@@ -865,342 +830,68 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel,
self.node = None self.node = None
self.vnc_port = None self.vnc_port = None
def redeploy(self, user=None, task_uuid=None):
"""Redeploy virtual machine with network
:param self: The virtual machine to redeploy.
:param user: The user who's issuing the command.
:type user: django.contrib.auth.models.User
:param task_uuid: The task's UUID, if the command is being executed
asynchronously.
:type task_uuid: str
"""
with instance_activity(code_suffix='redeploy', instance=self,
task_uuid=task_uuid, user=user) as act:
# Destroy VM
if self.node:
self._destroy_vm(act)
self._cleanup_after_destroy_vm(act)
# Deploy VM
self._schedule_vm(act)
self._deploy_vm(act)
def redeploy_async(self, user=None): def redeploy_async(self, user=None):
"""Execute redeploy asynchronously. """Execute redeploy asynchronously.
""" """
return local_tasks.redeploy.apply_async(args=[self, user], warn('Use self.redeploy.async instead.', DeprecationWarning)
queue="localhost.man") return self.redeploy.async(user=user)
def shut_off(self, user=None, task_uuid=None):
"""Shut off VM. (plug-out)
"""
def __on_commit(activity):
activity.resultant_state = 'STOPPED'
with instance_activity(code_suffix='shut_off', instance=self,
task_uuid=task_uuid, user=user,
on_commit=__on_commit) as act:
# Destroy VM
if self.node:
self._destroy_vm(act)
self._cleanup_after_destroy_vm(act)
self.save()
def shut_off_async(self, user=None): def shut_off_async(self, user=None):
"""Shut off VM. (plug-out) """Shut off VM. (plug-out)
""" """
return local_tasks.shut_off.apply_async(args=[self, user], warn('Use self.shut_off.async instead.', DeprecationWarning)
queue="localhost.man") return self.shut_off.async(user=user)
def destroy(self, user=None, task_uuid=None):
"""Remove virtual machine and its networks.
:param self: The virtual machine to destroy.
:type self: vm.models.Instance
:param user: The user who's issuing the command.
:type user: django.contrib.auth.models.User
:param task_uuid: The task's UUID, if the command is being executed
asynchronously.
:type task_uuid: str
"""
if self.destroyed_at:
return # already destroyed, nothing to do here
def __on_commit(activity):
activity.resultant_state = 'DESTROYED'
with instance_activity(code_suffix='destroy', instance=self,
on_commit=__on_commit, task_uuid=task_uuid,
user=user) as act:
if self.node:
self._destroy_vm(act)
# Destroy disks
with act.sub_activity('destroying_disks'):
for disk in self.disks.all():
disk.destroy()
self._cleanup_after_destroy_vm(act)
self.destroyed_at = timezone.now()
self.save()
def destroy_async(self, user=None): def destroy_async(self, user=None):
"""Execute destroy asynchronously. """Execute destroy asynchronously.
""" """
return local_tasks.destroy.apply_async(args=[self, user], warn('Use self.destroy.async instead.', DeprecationWarning)
queue="localhost.man") return self.destroy.async(user=user)
def sleep(self, user=None, task_uuid=None, timeout=60):
"""Suspend virtual machine with memory dump.
"""
if self.status not in ['RUNNING']:
raise self.WrongStateError(self)
def __on_abort(activity, error):
if isinstance(error, TimeLimitExceeded):
activity.resultant_state = None
else:
activity.resultant_state = 'ERROR'
def __on_commit(activity):
activity.resultant_state = 'SUSPENDED'
with instance_activity(code_suffix='sleep', instance=self,
on_abort=__on_abort, on_commit=__on_commit,
task_uuid=task_uuid, user=user) as act:
# Destroy networks
with act.sub_activity('destroying_net'):
for net in self.interface_set.all():
net.shutdown()
# Suspend vm
with act.sub_activity('suspending'):
queue_name = self.get_remote_queue_name('vm')
vm_tasks.sleep.apply_async(args=[self.vm_name,
self.mem_dump['path']],
queue=queue_name
).get(timeout=timeout)
self.node = None
self.save()
def sleep_async(self, user=None): def sleep_async(self, user=None):
"""Execute sleep asynchronously. """Execute sleep asynchronously.
""" """
return local_tasks.sleep.apply_async(args=[self, user], warn('Use self.sleep.async instead.', DeprecationWarning)
queue="localhost.man") return self.sleep.async(user=user)
def wake_up(self, user=None, task_uuid=None, timeout=60):
""" Wake up Virtual Machine from SUSPENDED state.
Power on Virtual Machine and load its memory from dump.
"""
if self.status not in ['SUSPENDED']:
raise self.WrongStateError(self)
def __on_abort(activity, error):
activity.resultant_state = 'ERROR'
def __on_commit(activity):
activity.resultant_state = 'RUNNING'
with instance_activity(code_suffix='wake_up', instance=self,
on_abort=__on_abort, on_commit=__on_commit,
task_uuid=task_uuid, user=user) as act:
# Schedule vm
self._schedule_vm(act)
queue_name = self.get_remote_queue_name('vm')
# Resume vm
with act.sub_activity('resuming'):
vm_tasks.wake_up.apply_async(args=[self.vm_name,
self.mem_dump['path']],
queue=queue_name
).get(timeout=timeout)
# Estabilish network connection (vmdriver)
with act.sub_activity('deploying_net'):
for net in self.interface_set.all():
net.deploy()
# Renew vm
self.renew(which='both', base_activity=act)
def wake_up_async(self, user=None): def wake_up_async(self, user=None):
"""Execute wake_up asynchronously. """Execute wake_up asynchronously.
""" """
return local_tasks.wake_up.apply_async(args=[self, user], warn('Use self.wake_up.async instead.', DeprecationWarning)
queue="localhost.man") return self.wake_up.async(user=user)
def shutdown(self, user=None, task_uuid=None, timeout=120):
"""Shutdown virtual machine with ACPI signal.
"""
def __on_abort(activity, error):
if isinstance(error, TimeLimitExceeded):
activity.resultant_state = None
else:
activity.resultant_state = 'ERROR'
def __on_commit(activity):
activity.resultant_state = 'STOPPED'
with instance_activity(code_suffix='shutdown', instance=self,
on_abort=__on_abort, on_commit=__on_commit,
task_uuid=task_uuid, user=user):
queue_name = self.get_remote_queue_name('vm')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.",
self.vm_name, queue_name)
vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name},
queue=queue_name
).get(timeout=timeout)
self.node = None
self.vnc_port = None
self.save()
def shutdown_async(self, user=None): def shutdown_async(self, user=None):
"""Execute shutdown asynchronously. """Execute shutdown asynchronously.
""" """
return local_tasks.shutdown.apply_async(args=[self, user], warn('Use self.shutdown.async instead.', DeprecationWarning)
queue="localhost.man") return self.shutdown.async(user=user)
def reset(self, user=None, task_uuid=None, timeout=5):
"""Reset virtual machine (reset button)
"""
with instance_activity(code_suffix='reset', instance=self,
task_uuid=task_uuid, user=user):
queue_name = self.get_remote_queue_name('vm')
vm_tasks.reset.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def reset_async(self, user=None): def reset_async(self, user=None):
"""Execute reset asynchronously. """Execute reset asynchronously.
""" """
return local_tasks.reset.apply_async(args=[self, user], warn('Use self.reset.async instead.', DeprecationWarning)
queue="localhost.man") return self.reset.async(user=user)
def reboot(self, user=None, task_uuid=None, timeout=5):
"""Reboot virtual machine with Ctrl+Alt+Del signal.
"""
with instance_activity(code_suffix='reboot', instance=self,
task_uuid=task_uuid, user=user):
queue_name = self.get_remote_queue_name('vm')
vm_tasks.reboot.apply_async(args=[self.vm_name],
queue=queue_name
).get(timeout=timeout)
def reboot_async(self, user=None): def reboot_async(self, user=None):
"""Execute reboot asynchronously. """ """Execute reboot asynchronously. """
return local_tasks.reboot.apply_async(args=[self, user], warn('Use self.reboot.async instead.', DeprecationWarning)
queue="localhost.man") return self.reboot.async(user=user)
def migrate_async(self, to_node, user=None): def migrate_async(self, to_node, user=None):
"""Execute migrate asynchronously. """ """Execute migrate asynchronously. """
return local_tasks.migrate.apply_async(args=[self, to_node, user], warn('Use self.shut_off.async instead.', DeprecationWarning)
queue="localhost.man") return self.migrate.async(user=user, to_node=to_node)
def migrate(self, to_node=None, user=None, task_uuid=None, timeout=120):
"""Live migrate running vm to another node. """
with instance_activity(code_suffix='migrate', instance=self,
task_uuid=task_uuid, user=user) as act:
if not to_node:
with act.sub_activity('scheduling') as sa:
to_node = self.select_node()
sa.result = to_node
# Destroy networks
with act.sub_activity('destroying_net'):
for net in self.interface_set.all():
net.shutdown()
with act.sub_activity('migrate_vm'):
queue_name = self.get_remote_queue_name('vm')
vm_tasks.migrate.apply_async(args=[self.vm_name,
to_node.host.hostname],
queue=queue_name
).get(timeout=timeout)
# Refresh node information
self.node = to_node
self.save()
# Estabilish network connection (vmdriver)
with act.sub_activity('deploying_net'):
for net in self.interface_set.all():
net.deploy()
def save_as_template_async(self, name, user=None, **kwargs): def save_as_template_async(self, name, user=None, **kwargs):
""" Save as template asynchronusly. """ Save as template asynchronusly.
""" """
return local_tasks.save_as_template.apply_async( warn('Use self.shut_off.async instead.', DeprecationWarning)
args=[self, name, user, kwargs], queue="localhost.man") return self.save_as_template.async(name=name, user=user, **kwargs)
def save_as_template(self, name, task_uuid=None, user=None,
timeout=300, **kwargs):
""" Save Virtual Machine as a Template.
Template can be shared with groups and users.
Users can instantiate Virtual Machines from Templates.
"""
with instance_activity(code_suffix="save_as_template", instance=self,
task_uuid=task_uuid, user=user) as act:
# prepare parameters
params = {
'access_method': self.access_method,
'arch': self.arch,
'boot_menu': self.boot_menu,
'description': self.description,
'lease': self.lease, # Can be problem in new VM
'max_ram_size': self.max_ram_size,
'name': name,
'num_cores': self.num_cores,
'owner': user,
'parent': self.template, # Can be problem
'priority': self.priority,
'ram_size': self.ram_size,
'raw_data': self.raw_data,
'system': self.system,
}
params.update(kwargs)
def __try_save_disk(disk):
try:
return disk.save_as() # can do in parallel
except Disk.WrongDiskTypeError:
return disk
# create template and do additional setup
tmpl = InstanceTemplate(**params)
tmpl.full_clean() # Avoiding database errors.
tmpl.save()
try:
with act.sub_activity('saving_disks'):
tmpl.disks.add(*[__try_save_disk(disk)
for disk in self.disks.all()])
# create interface templates
for i in self.interface_set.all():
i.save_as_template(tmpl)
except:
tmpl.delete()
raise
else:
return tmpl
def shutdown_and_save_as_template(self, name, user=None, task_uuid=None, def shutdown_and_save_as_template(self, name, user=None, task_uuid=None,
**kwargs): **kwargs):
self.shutdown(user, task_uuid) self.shutdown(user=user)
self.save_as_template(name, **kwargs) self.save_as_template(name, user=user, **kwargs)
def get_status_icon(self): def get_status_icon(self):
return { return {
......
...@@ -13,66 +13,3 @@ def async_operation(operation_id, instance_pk, activity_pk, **kwargs): ...@@ -13,66 +13,3 @@ def async_operation(operation_id, instance_pk, activity_pk, **kwargs):
activity.save() activity.save()
return operation._exec_op(activity=activity, **kwargs) return operation._exec_op(activity=activity, **kwargs)
# TODO: Keep synchronised with Instance funcs
@celery.task
def deploy(instance, user):
instance.deploy(task_uuid=deploy.request.id, user=user)
@celery.task
def redeploy(instance, user):
instance.redeploy(task_uuid=redeploy.request.id, user=user)
@celery.task
def shut_off(instance, user):
instance.shut_off(task_uuid=shut_off.request.id, user=user)
@celery.task
def destroy(instance, user):
instance.destroy(task_uuid=destroy.request.id, user=user)
@celery.task
def save_as_template(instance, name, user, params):
instance.save_as_template(name, task_uuid=save_as_template.request.id,
user=user, **params)
@celery.task
def sleep(instance, user):
instance.sleep(task_uuid=sleep.request.id, user=user)
@celery.task
def wake_up(instance, user):
instance.wake_up(task_uuid=wake_up.request.id, user=user)
@celery.task
def shutdown(instance, user):
instance.shutdown(task_uuid=shutdown.request.id, user=user)
@celery.task
def reset(instance, user):
instance.reset(task_uuid=reset.request.id, user=user)
@celery.task
def reboot(instance, user):
instance.reboot(task_uuid=reboot.request.id, user=user)
@celery.task
def migrate(instance, to_node, user):
instance.migrate(to_node, task_uuid=migrate.request.id, user=user)
@celery.task
def flush(node, user):
node.flush(task_uuid=flush.request.id, user=user)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment