# Copyright 2014 Budapest University of Technology and Economics (BME IK) # # This file is part of CIRCLE Cloud. # # CIRCLE is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along # with CIRCLE. If not, see <http://www.gnu.org/licenses/>. from __future__ import absolute_import, unicode_literals from base64 import encodestring from hashlib import md5 from logging import getLogger import os from re import search from string import ascii_lowercase from StringIO import StringIO from tarfile import TarFile, TarInfo import time from urlparse import urlsplit from django.core.exceptions import PermissionDenied, SuspiciousOperation from django.utils import timezone from django.utils.translation import ugettext_lazy as _, ugettext_noop from django.conf import settings from django.db.models import Q from sizefield.utils import filesizeformat from celery.contrib.abortable import AbortableAsyncResult from celery.exceptions import TimeLimitExceeded, TimeoutError from common.models import ( create_readable, humanize_exception, HumanReadableException ) from common.operations import Operation, register_operation, SubOperationMixin from manager.scheduler import SchedulerError from .tasks.local_tasks import ( abortable_async_instance_operation, abortable_async_node_operation, ) from .models import ( Instance, InstanceActivity, InstanceTemplate, Interface, Node, NodeActivity, pwgen ) from .tasks import agent_tasks, vm_tasks from dashboard.store_api import Store, NoStoreException from firewall.models import Host from monitor.client import Client from storage.tasks import storage_tasks logger = getLogger(__name__) class RemoteOperationMixin(object): remote_timeout = 30 def _operation(self, **kwargs): args = self._get_remote_args(**kwargs) return self.task.apply_async( args=args, queue=self._get_remote_queue() ).get(timeout=self.remote_timeout) def check_precond(self): super(RemoteOperationMixin, self).check_precond() self._get_remote_queue() class AbortableRemoteOperationMixin(object): remote_step = property(lambda self: self.remote_timeout / 10) def _operation(self, task, **kwargs): args = self._get_remote_args(**kwargs), remote = self.task.apply_async( args=args, queue=self._get_remote_queue()) for i in xrange(0, self.remote_timeout, self.remote_step): try: return remote.get(timeout=self.remote_step) except TimeoutError as e: if task is not None and task.is_aborted(): AbortableAsyncResult(remote.id).abort() raise humanize_exception(ugettext_noop( "Operation aborted by user."), e) raise TimeLimitExceeded() class InstanceOperation(Operation): acl_level = 'owner' async_operation = abortable_async_instance_operation host_cls = Instance concurrency_check = True accept_states = None deny_states = None resultant_state = None def __init__(self, instance): super(InstanceOperation, self).__init__(subject=instance) self.instance = instance def check_precond(self): if self.instance.destroyed_at: raise self.instance.InstanceDestroyedError(self.instance) if self.accept_states: if self.instance.status not in self.accept_states: logger.debug("precond failed for %s: %s not in %s", unicode(self.__class__), unicode(self.instance.status), unicode(self.accept_states)) raise self.instance.WrongStateError(self.instance) if self.deny_states: if self.instance.status in self.deny_states: logger.debug("precond failed for %s: %s in %s", unicode(self.__class__), unicode(self.instance.status), unicode(self.accept_states)) raise self.instance.WrongStateError(self.instance) def check_auth(self, user): if not self.instance.has_level(user, self.acl_level): raise humanize_exception(ugettext_noop( "%(acl_level)s level is required for this operation."), PermissionDenied(), acl_level=self.acl_level) super(InstanceOperation, self).check_auth(user=user) def create_activity(self, parent, user, kwargs): name = self.get_activity_name(kwargs) if parent: if parent.instance != self.instance: raise ValueError("The instance associated with the specified " "parent activity does not match the instance " "bound to the operation.") if parent.user != user: raise ValueError("The user associated with the specified " "parent activity does not match the user " "provided as parameter.") return parent.create_sub( code_suffix=self.get_activity_code_suffix(), readable_name=name, resultant_state=self.resultant_state) else: return InstanceActivity.create( code_suffix=self.get_activity_code_suffix(), instance=self.instance, readable_name=name, user=user, concurrency_check=self.concurrency_check, resultant_state=self.resultant_state) def is_preferred(self): """If this is the recommended op in the current state of the instance. """ return False class RemoteInstanceOperation(RemoteOperationMixin, InstanceOperation): remote_queue = ('vm', 'fast') def _get_remote_queue(self): return self.instance.get_remote_queue_name(*self.remote_queue) def _get_remote_args(self, **kwargs): return [self.instance.vm_name] class EnsureAgentMixin(object): accept_states = ('RUNNING', ) def check_precond(self): super(EnsureAgentMixin, self).check_precond() last_boot_time = self.instance.activity_log.filter( succeeded=True, activity_code__in=( "vm.Instance.deploy", "vm.Instance.reset", "vm.Instance.reboot")).latest("finished").finished try: InstanceActivity.objects.filter( activity_code="vm.Instance.agent.starting", started__gt=last_boot_time).latest("started") except InstanceActivity.DoesNotExist: # no agent since last boot raise self.instance.NoAgentError(self.instance) class RemoteAgentOperation(EnsureAgentMixin, RemoteInstanceOperation): remote_queue = ('agent', ) concurrency_check = False @register_operation class AddInterfaceOperation(InstanceOperation): id = 'add_interface' name = _("add interface") description = _("Add a new network interface for the specified VLAN to " "the VM.") required_perms = () accept_states = ('STOPPED', 'PENDING', 'RUNNING') def rollback(self, net, activity): with activity.sub_activity( 'destroying_net', readable_name=ugettext_noop("destroy network (rollback)")): net.destroy() net.delete() def _operation(self, activity, user, system, vlan, managed=None): if not vlan.has_level(user, 'user'): raise humanize_exception(ugettext_noop( "User acces to vlan %(vlan)s is required."), PermissionDenied(), vlan=vlan) if managed is None: managed = vlan.managed net = Interface.create(base_activity=activity, instance=self.instance, managed=managed, owner=user, vlan=vlan) if self.instance.is_running: try: self.instance._attach_network( interface=net, parent_activity=activity) except Exception as e: if hasattr(e, 'libvirtError'): self.rollback(net, activity) raise net.deploy() self.instance._change_ip(parent_activity=activity) self.instance._restart_networking(parent_activity=activity) def get_activity_name(self, kwargs): return create_readable(ugettext_noop("add %(vlan)s interface"), vlan=kwargs['vlan']) @register_operation class CreateDiskOperation(InstanceOperation): id = 'create_disk' name = _("create disk") description = _("Create and attach empty disk to the virtual machine.") required_perms = ('storage.create_empty_disk', ) accept_states = ('STOPPED', 'PENDING', 'RUNNING') def _operation(self, user, size, activity, name=None): from storage.models import Disk if not name: name = "new disk" disk = Disk.create(size=size, name=name, type="qcow2-norm") disk.full_clean() devnums = list(ascii_lowercase) for d in self.instance.disks.all(): devnums.remove(d.dev_num) disk.dev_num = devnums.pop(0) disk.save() self.instance.disks.add(disk) if self.instance.is_running: with activity.sub_activity( 'deploying_disk', readable_name=ugettext_noop("deploying disk") ): disk.deploy() self.instance._attach_disk(parent_activity=activity, disk=disk) def get_activity_name(self, kwargs): return create_readable( ugettext_noop("create disk %(name)s (%(size)s)"), size=filesizeformat(kwargs['size']), name=kwargs['name']) @register_operation class ResizeDiskOperation(RemoteInstanceOperation): id = 'resize_disk' name = _("resize disk") description = _("Resize the virtual disk image. " "Size must be greater value than the actual size.") required_perms = ('storage.resize_disk', ) accept_states = ('RUNNING', ) async_queue = "localhost.man.slow" remote_queue = ('vm', 'slow') task = vm_tasks.resize_disk def _get_remote_args(self, disk, size, **kwargs): return (super(ResizeDiskOperation, self) ._get_remote_args(**kwargs) + [disk.path, size]) def get_activity_name(self, kwargs): return create_readable( ugettext_noop("resize disk %(name)s to %(size)s"), size=filesizeformat(kwargs['size']), name=kwargs['disk'].name) def _operation(self, disk, size): super(ResizeDiskOperation, self)._operation(disk=disk, size=size) disk.size = size disk.save() @register_operation class DownloadDiskOperation(InstanceOperation): id = 'download_disk' name = _("download disk") description = _("Download and attach disk image (ISO file) for the " "virtual machine. Most operating systems do not detect a " "new optical drive, so you may have to reboot the " "machine.") abortable = True has_percentage = True required_perms = ('storage.download_disk', ) accept_states = ('STOPPED', 'PENDING', 'RUNNING') async_queue = "localhost.man.slow" def _operation(self, user, url, task, activity, name=None): from storage.models import Disk disk = Disk.download(url=url, name=name, task=task) devnums = list(ascii_lowercase) for d in self.instance.disks.all(): devnums.remove(d.dev_num) disk.dev_num = devnums.pop(0) disk.full_clean() disk.save() self.instance.disks.add(disk) activity.readable_name = create_readable( ugettext_noop("download %(name)s"), name=disk.name) activity.result = create_readable(ugettext_noop( "Downloading %(url)s is finished. The file md5sum " "is: '%(checksum)s'."), url=url, checksum=disk.checksum) # TODO iso (cd) hot-plug is not supported by kvm/guests if self.instance.is_running and disk.type not in ["iso"]: self.instance._attach_disk(parent_activity=activity, disk=disk) @register_operation class DeployOperation(InstanceOperation): id = 'deploy' name = _("deploy") description = _("Deploy and start the virtual machine (including storage " "and network configuration).") required_perms = () deny_states = ('SUSPENDED', 'RUNNING') resultant_state = 'RUNNING' def is_preferred(self): return self.instance.status in (self.instance.STATUS.STOPPED, self.instance.STATUS.PENDING, self.instance.STATUS.ERROR) def on_abort(self, activity, error): activity.resultant_state = 'STOPPED' def on_commit(self, activity): activity.resultant_state = 'RUNNING' activity.result = create_readable( ugettext_noop("virtual machine successfully " "deployed to node: %(node)s"), node=self.instance.node) def _operation(self, activity, node=None): # Allocate VNC port and host node self.instance.allocate_vnc_port() if node is not None: self.instance.node = node self.instance.save() else: self.instance.allocate_node() # Deploy virtual images self.instance._deploy_disks(parent_activity=activity) # Deploy VM on remote machine if self.instance.state not in ['PAUSED']: self.instance._deploy_vm(parent_activity=activity) # Establish network connection (vmdriver) with activity.sub_activity( 'deploying_net', readable_name=ugettext_noop( "deploy network")): self.instance.deploy_net() try: self.instance.renew(parent_activity=activity) except: pass self.instance._resume_vm(parent_activity=activity) if self.instance.has_agent: activity.sub_activity('os_boot', readable_name=ugettext_noop( "wait operating system loading"), interruptible=True) @register_operation class DeployVmOperation(SubOperationMixin, RemoteInstanceOperation): id = "_deploy_vm" name = _("deploy vm") description = _("Deploy virtual machine.") remote_queue = ("vm", "slow") task = vm_tasks.deploy def _get_remote_args(self, **kwargs): return [self.instance.get_vm_desc()] # intentionally not calling super def get_activity_name(self, kwargs): return create_readable(ugettext_noop("deploy virtual machine"), ugettext_noop("deploy vm to %(node)s"), node=self.instance.node) @register_operation class DeployDisksOperation(SubOperationMixin, InstanceOperation): id = "_deploy_disks" name = _("deploy disks") description = _("Deploy all associated disks.") def _operation(self): devnums = list(ascii_lowercase) # a-z for disk in self.instance.disks.all(): # assign device numbers if disk.dev_num in devnums: devnums.remove(disk.dev_num) else: disk.dev_num = devnums.pop(0) disk.save() # deploy disk disk.deploy() @register_operation class ResumeVmOperation(SubOperationMixin, RemoteInstanceOperation): id = "_resume_vm" name = _("boot virtual machine") remote_queue = ("vm", "slow") task = vm_tasks.resume @register_operation class DestroyOperation(InstanceOperation): id = 'destroy' name = _("destroy") description = _("Permanently destroy virtual machine, its network " "settings and disks.") required_perms = () resultant_state = 'DESTROYED' def _operation(self, activity, system): # Destroy networks with activity.sub_activity( 'destroying_net', readable_name=ugettext_noop("destroy network")): if self.instance.node: self.instance.shutdown_net() self.instance.destroy_net() if self.instance.node: self.instance._delete_vm(parent_activity=activity) # Destroy disks with activity.sub_activity( 'destroying_disks', readable_name=ugettext_noop("destroy disks")): self.instance.destroy_disks() # Delete mem. dump if exists try: self.instance._delete_mem_dump(parent_activity=activity) except: pass # Clear node and VNC port association self.instance.yield_node() self.instance.yield_vnc_port() self.instance.destroyed_at = timezone.now() self.instance.save() @register_operation class DeleteVmOperation(SubOperationMixin, RemoteInstanceOperation): id = "_delete_vm" name = _("destroy virtual machine") task = vm_tasks.destroy # if e.libvirtError and "Domain not found" in str(e): @register_operation class DeleteMemDumpOperation(RemoteOperationMixin, SubOperationMixin, InstanceOperation): id = "_delete_mem_dump" name = _("removing memory dump") task = storage_tasks.delete_dump def _get_remote_queue(self): return self.instance.mem_dump['datastore'].get_remote_queue_name( "storage", "fast") def _get_remote_args(self, **kwargs): return [self.instance.mem_dump['path']] @register_operation class MigrateOperation(RemoteInstanceOperation): id = 'migrate' name = _("migrate") description = _("Move a running virtual machine to an other worker node " "keeping its full state.") required_perms = () superuser_required = True accept_states = ('RUNNING', ) async_queue = "localhost.man.slow" task = vm_tasks.migrate remote_queue = ("vm", "slow") remote_timeout = 1000 def _get_remote_args(self, to_node, live_migration, **kwargs): return (super(MigrateOperation, self)._get_remote_args(**kwargs) + [to_node.host.hostname, live_migration]) def rollback(self, activity): with activity.sub_activity( 'rollback_net', readable_name=ugettext_noop( "redeploy network (rollback)")): self.instance.deploy_net() def _operation(self, activity, to_node=None, live_migration=True): if not to_node: with activity.sub_activity('scheduling', readable_name=ugettext_noop( "schedule")) as sa: to_node = self.instance.select_node() sa.result = to_node try: with activity.sub_activity( 'migrate_vm', readable_name=create_readable( ugettext_noop("migrate to %(node)s"), node=to_node)): super(MigrateOperation, self)._operation( to_node=to_node, live_migration=live_migration) except Exception as e: if hasattr(e, 'libvirtError'): self.rollback(activity) raise # Shutdown networks with activity.sub_activity( 'shutdown_net', readable_name=ugettext_noop( "shutdown network")): self.instance.shutdown_net() # Refresh node information self.instance.node = to_node self.instance.save() # Estabilish network connection (vmdriver) with activity.sub_activity( 'deploying_net', readable_name=ugettext_noop( "deploy network")): self.instance.deploy_net() @register_operation class RebootOperation(RemoteInstanceOperation): id = 'reboot' name = _("reboot") description = _("Warm reboot virtual machine by sending Ctrl+Alt+Del " "signal to its console.") required_perms = () accept_states = ('RUNNING', ) task = vm_tasks.reboot def _operation(self, activity): super(RebootOperation, self)._operation() if self.instance.has_agent: activity.sub_activity('os_boot', readable_name=ugettext_noop( "wait operating system loading"), interruptible=True) @register_operation class RemoveInterfaceOperation(InstanceOperation): id = 'remove_interface' name = _("remove interface") description = _("Remove the specified network interface and erase IP " "address allocations, related firewall rules and " "hostnames.") required_perms = () accept_states = ('STOPPED', 'PENDING', 'RUNNING') def _operation(self, activity, user, system, interface): if self.instance.is_running: self.instance._detach_network(interface=interface, parent_activity=activity) interface.shutdown() interface.destroy() interface.delete() def get_activity_name(self, kwargs): return create_readable(ugettext_noop("remove %(vlan)s interface"), vlan=kwargs['interface'].vlan) @register_operation class RemovePortOperation(InstanceOperation): id = 'remove_port' name = _("close port") description = _("Close the specified port.") concurrency_check = False required_perms = ('vm.config_ports', ) def _operation(self, activity, rule): interface = rule.host.interface_set.get() if interface.instance != self.instance: raise SuspiciousOperation() activity.readable_name = create_readable( ugettext_noop("close %(proto)s/%(port)d on %(host)s"), proto=rule.proto, port=rule.dport, host=rule.host) rule.delete() @register_operation class AddPortOperation(InstanceOperation): id = 'add_port' name = _("open port") description = _("Open the specified port.") concurrency_check = False required_perms = ('vm.config_ports', ) def _operation(self, activity, host, proto, port): if host.interface_set.get().instance != self.instance: raise SuspiciousOperation() host.add_port(proto, private=port) activity.readable_name = create_readable( ugettext_noop("open %(proto)s/%(port)d on %(host)s"), proto=proto, port=port, host=host) @register_operation class RemoveDiskOperation(InstanceOperation): id = 'remove_disk' name = _("remove disk") description = _("Remove the specified disk from the virtual machine, and " "destroy the data.") required_perms = () accept_states = ('STOPPED', 'PENDING', 'RUNNING') def _operation(self, activity, user, system, disk): if self.instance.is_running and disk.type not in ["iso"]: self.instance._detach_disk(disk=disk, parent_activity=activity) with activity.sub_activity( 'destroy_disk', readable_name=ugettext_noop('destroy disk') ): disk.destroy() return self.instance.disks.remove(disk) def get_activity_name(self, kwargs): return create_readable(ugettext_noop('remove disk %(name)s'), name=kwargs["disk"].name) @register_operation class ResetOperation(RemoteInstanceOperation): id = 'reset' name = _("reset") description = _("Cold reboot virtual machine (power cycle).") required_perms = () accept_states = ('RUNNING', ) task = vm_tasks.reset def _operation(self, activity): super(ResetOperation, self)._operation() if self.instance.has_agent: activity.sub_activity('os_boot', readable_name=ugettext_noop( "wait operating system loading"), interruptible=True) @register_operation class SaveAsTemplateOperation(InstanceOperation): id = 'save_as_template' name = _("save as template") description = _("Save virtual machine as a template so they can be shared " "with users and groups. Anyone who has access to a " "template (and to the networks it uses) will be able to " "start an instance of it.") has_percentage = True abortable = True required_perms = ('vm.create_template', ) accept_states = ('RUNNING', 'STOPPED') async_queue = "localhost.man.slow" def is_preferred(self): return (self.instance.is_base and self.instance.status == self.instance.STATUS.RUNNING) @staticmethod def _rename(name): m = search(r" v(\d+)$", name) if m: v = int(m.group(1)) + 1 name = search(r"^(.*) v(\d+)$", name).group(1) else: v = 1 return "%s v%d" % (name, v) def on_abort(self, activity, error): if hasattr(self, 'disks'): for disk in self.disks: disk.destroy() def _operation(self, activity, user, system, name=None, with_shutdown=True, clone=False, task=None, **kwargs): try: self.instance._cleanup(parent_activity=activity, user=user) except: pass if with_shutdown: try: self.instance.shutdown(parent_activity=activity, user=user, task=task) except Instance.WrongStateError: pass # prepare parameters params = { 'access_method': self.instance.access_method, 'arch': self.instance.arch, 'boot_menu': self.instance.boot_menu, 'description': self.instance.description, 'lease': self.instance.lease, # Can be problem in new VM 'max_ram_size': self.instance.max_ram_size, 'name': name or self._rename(self.instance.name), 'num_cores': self.instance.num_cores, 'owner': user, 'parent': self.instance.template or None, # Can be problem 'priority': self.instance.priority, 'ram_size': self.instance.ram_size, 'raw_data': self.instance.raw_data, 'system': self.instance.system, } params.update(kwargs) params.pop("parent_activity", None) from storage.models import Disk def __try_save_disk(disk): try: return disk.save_as(task) except Disk.WrongDiskTypeError: return disk self.disks = [] for disk in self.instance.disks.all(): with activity.sub_activity( 'saving_disk', readable_name=create_readable( ugettext_noop("saving disk %(name)s"), name=disk.name) ): self.disks.append(__try_save_disk(disk)) # create template and do additional setup tmpl = InstanceTemplate(**params) tmpl.full_clean() # Avoiding database errors. tmpl.save() # Copy traits from the VM instance tmpl.req_traits.add(*self.instance.req_traits.all()) if clone: tmpl.clone_acl(self.instance.template) # Add permission for the original owner of the template tmpl.set_level(self.instance.template.owner, 'owner') tmpl.set_level(user, 'owner') try: tmpl.disks.add(*self.disks) # create interface templates for i in self.instance.interface_set.all(): i.save_as_template(tmpl) except: tmpl.delete() raise else: return tmpl @register_operation class ShutdownOperation(AbortableRemoteOperationMixin, RemoteInstanceOperation): id = 'shutdown' name = _("shutdown") description = _("Try to halt virtual machine by a standard ACPI signal, " "allowing the operating system to keep a consistent " "state. The operation will fail if the machine does not " "turn itself off in a period.") abortable = True required_perms = () accept_states = ('RUNNING', ) resultant_state = 'STOPPED' task = vm_tasks.shutdown remote_queue = ("vm", "slow") remote_timeout = 180 def _operation(self, task): super(ShutdownOperation, self)._operation(task=task) self.instance.yield_node() def on_abort(self, activity, error): if isinstance(error, TimeLimitExceeded): activity.result = humanize_exception(ugettext_noop( "The virtual machine did not switch off in the provided time " "limit. Most of the time this is caused by incorrect ACPI " "settings. You can also try to power off the machine from the " "operating system manually."), error) activity.resultant_state = None else: super(ShutdownOperation, self).on_abort(activity, error) @register_operation class ShutOffOperation(InstanceOperation): id = 'shut_off' name = _("shut off") description = _("Forcibly halt a virtual machine without notifying the " "operating system. This operation will even work in cases " "when shutdown does not, but the operating system and the " "file systems are likely to be in an inconsistent state, " "so data loss is also possible. The effect of this " "operation is the same as interrupting the power supply " "of a physical machine.") required_perms = () accept_states = ('RUNNING', 'PAUSED') resultant_state = 'STOPPED' def _operation(self, activity): # Shutdown networks with activity.sub_activity('shutdown_net'): self.instance.shutdown_net() self.instance._delete_vm(parent_activity=activity) self.instance.yield_node() @register_operation class SleepOperation(InstanceOperation): id = 'sleep' name = _("sleep") description = _("Suspend virtual machine. This means the machine is " "stopped and its memory is saved to disk, so if the " "machine is waked up, all the applications will keep " "running. Most of the applications will be able to " "continue even after a long suspension, but those which " "need a continous network connection may fail when " "resumed. In the meantime, the machine will only use " "storage resources, and keep network resources allocated.") required_perms = () accept_states = ('RUNNING', ) resultant_state = 'SUSPENDED' async_queue = "localhost.man.slow" def is_preferred(self): return (not self.instance.is_base and self.instance.status == self.instance.STATUS.RUNNING) def on_abort(self, activity, error): if isinstance(error, TimeLimitExceeded): activity.resultant_state = None else: activity.resultant_state = 'ERROR' def _operation(self, activity, system): with activity.sub_activity('shutdown_net', readable_name=ugettext_noop( "shutdown network")): self.instance.shutdown_net() self.instance._suspend_vm(parent_activity=activity) self.instance.yield_node() @register_operation class SuspendVmOperation(SubOperationMixin, RemoteInstanceOperation): id = "_suspend_vm" name = _("suspend virtual machine") task = vm_tasks.sleep remote_queue = ("vm", "slow") remote_timeout = 1000 def _get_remote_args(self, **kwargs): return (super(SleepOperation.SuspendVmOperation, self) ._get_remote_args(**kwargs) + [self.instance.mem_dump['path']]) @register_operation class WakeUpOperation(InstanceOperation): id = 'wake_up' name = _("wake up") description = _("Wake up sleeping (suspended) virtual machine. This will " "load the saved memory of the system and start the " "virtual machine from this state.") required_perms = () accept_states = ('SUSPENDED', ) resultant_state = 'RUNNING' async_queue = "localhost.man.slow" def is_preferred(self): return self.instance.status == self.instance.STATUS.SUSPENDED def on_abort(self, activity, error): if isinstance(error, SchedulerError): activity.resultant_state = None else: activity.resultant_state = 'ERROR' def _operation(self, activity): # Schedule vm self.instance.allocate_vnc_port() self.instance.allocate_node() # Resume vm self.instance._wake_up_vm(parent_activity=activity) # Estabilish network connection (vmdriver) with activity.sub_activity( 'deploying_net', readable_name=ugettext_noop( "deploy network")): self.instance.deploy_net() try: self.instance.renew(parent_activity=activity) except: pass @register_operation class WakeUpVmOperation(SubOperationMixin, RemoteInstanceOperation): id = "_wake_up_vm" name = _("resume virtual machine") task = vm_tasks.wake_up remote_queue = ("vm", "slow") remote_timeout = 1000 def _get_remote_args(self, **kwargs): return (super(WakeUpOperation.WakeUpVmOperation, self) ._get_remote_args(**kwargs) + [self.instance.mem_dump['path']]) @register_operation class RenewOperation(InstanceOperation): id = 'renew' name = _("renew") description = _("Virtual machines are suspended and destroyed after they " "expire. This operation renews expiration times according " "to the lease type. If the machine is close to the " "expiration, its owner will be notified.") acl_level = "operator" required_perms = () concurrency_check = False def _operation(self, activity, lease=None, force=False, save=False): suspend, delete = self.instance.get_renew_times(lease) if (not force and suspend and self.instance.time_of_suspend and suspend < self.instance.time_of_suspend): raise HumanReadableException.create(ugettext_noop( "Renewing the machine with the selected lease would result " "in its suspension time get earlier than before.")) if (not force and delete and self.instance.time_of_delete and delete < self.instance.time_of_delete): raise HumanReadableException.create(ugettext_noop( "Renewing the machine with the selected lease would result " "in its delete time get earlier than before.")) self.instance.time_of_suspend = suspend self.instance.time_of_delete = delete if save: self.instance.lease = lease self.instance.save() activity.result = create_readable(ugettext_noop( "Renewed to suspend at %(suspend)s and destroy at %(delete)s."), suspend=suspend, delete=delete) @register_operation class ChangeStateOperation(InstanceOperation): id = 'emergency_change_state' name = _("emergency state change") description = _("Change the virtual machine state to NOSTATE. This " "should only be used if manual intervention was needed in " "the virtualization layer, and the machine has to be " "redeployed without losing its storage and network " "resources.") acl_level = "owner" required_perms = ('vm.emergency_change_state', ) concurrency_check = False def _operation(self, user, activity, new_state="NOSTATE", interrupt=False, reset_node=False): activity.resultant_state = new_state if interrupt: msg_txt = ugettext_noop("Activity is forcibly interrupted.") message = create_readable(msg_txt, msg_txt) for i in InstanceActivity.objects.filter( finished__isnull=True, instance=self.instance): i.finish(False, result=message) logger.error('Forced finishing activity %s', i) if reset_node: self.instance.node = None self.instance.save() @register_operation class RedeployOperation(InstanceOperation): id = 'redeploy' name = _("redeploy") description = _("Change the virtual machine state to NOSTATE " "and redeploy the VM. This operation allows starting " "machines formerly running on a failed node.") acl_level = "owner" required_perms = ('vm.redeploy', ) concurrency_check = False def _operation(self, user, activity, with_emergency_change_state=True): if with_emergency_change_state: ChangeStateOperation(self.instance).call( parent_activity=activity, user=user, new_state='NOSTATE', interrupt=False, reset_node=True) else: ShutOffOperation(self.instance).call( parent_activity=activity, user=user) self.instance._update_status() DeployOperation(self.instance).call( parent_activity=activity, user=user) class NodeOperation(Operation): async_operation = abortable_async_node_operation host_cls = Node online_required = True superuser_required = True def __init__(self, node): super(NodeOperation, self).__init__(subject=node) self.node = node def check_precond(self): super(NodeOperation, self).check_precond() if self.online_required and not self.node.online: raise humanize_exception(ugettext_noop( "You cannot call this operation on an offline node."), Exception()) def create_activity(self, parent, user, kwargs): name = self.get_activity_name(kwargs) if parent: if parent.node != self.node: raise ValueError("The node associated with the specified " "parent activity does not match the node " "bound to the operation.") if parent.user != user: raise ValueError("The user associated with the specified " "parent activity does not match the user " "provided as parameter.") return parent.create_sub( code_suffix=self.get_activity_code_suffix(), readable_name=name) else: return NodeActivity.create( code_suffix=self.get_activity_code_suffix(), node=self.node, user=user, readable_name=name) @register_operation class ResetNodeOperation(NodeOperation): id = 'reset' name = _("reset") description = _("Disable missing node and redeploy all instances " "on other ones.") required_perms = () online_required = False async_queue = "localhost.man.slow" def check_precond(self): super(ResetNodeOperation, self).check_precond() if not self.node.enabled or self.node.online: raise humanize_exception(ugettext_noop( "You cannot reset a disabled or online node."), Exception()) def _operation(self, activity, user): if self.node.enabled: DisableOperation(self.node).call(parent_activity=activity, user=user) for i in self.node.instance_set.all(): name = create_readable(ugettext_noop( "migrate %(instance)s (%(pk)s)"), instance=i.name, pk=i.pk) with activity.sub_activity('migrate_instance_%d' % i.pk, readable_name=name): i.redeploy(user=user) @register_operation class FlushOperation(NodeOperation): id = 'flush' name = _("flush") description = _("Passivate node and move all instances to other ones.") required_perms = () async_queue = "localhost.man.slow" def _operation(self, activity, user): if self.node.schedule_enabled: PassivateOperation(self.node).call(parent_activity=activity, user=user) for i in self.node.instance_set.all(): name = create_readable(ugettext_noop( "migrate %(instance)s (%(pk)s)"), instance=i.name, pk=i.pk) with activity.sub_activity('migrate_instance_%d' % i.pk, readable_name=name): i.migrate(user=user) @register_operation class ActivateOperation(NodeOperation): id = 'activate' name = _("activate") description = _("Make node active, i.e. scheduler is allowed to deploy " "virtual machines to it.") required_perms = () def check_precond(self): super(ActivateOperation, self).check_precond() if self.node.enabled and self.node.schedule_enabled: raise humanize_exception(ugettext_noop( "You cannot activate an active node."), Exception()) def _operation(self): self.node.enabled = True self.node.schedule_enabled = True self.node.get_info(invalidate_cache=True) self.node.save() @register_operation class PassivateOperation(NodeOperation): id = 'passivate' name = _("passivate") description = _("Make node passive, i.e. scheduler is denied to deploy " "virtual machines to it, but remaining instances and " "the ones manually migrated will continue running.") required_perms = () def check_precond(self): if self.node.enabled and not self.node.schedule_enabled: raise humanize_exception(ugettext_noop( "You cannot passivate a passive node."), Exception()) super(PassivateOperation, self).check_precond() def _operation(self): self.node.enabled = True self.node.schedule_enabled = False self.node.get_info(invalidate_cache=True) self.node.save() @register_operation class DisableOperation(NodeOperation): id = 'disable' name = _("disable") description = _("Disable node.") required_perms = () online_required = False def check_precond(self): if not self.node.enabled: raise humanize_exception(ugettext_noop( "You cannot disable a disabled node."), Exception()) if self.node.instance_set.exists(): raise humanize_exception(ugettext_noop( "You cannot disable a node which is hosting instances."), Exception()) super(DisableOperation, self).check_precond() def _operation(self): self.node.enabled = False self.node.schedule_enabled = False self.node.save() @register_operation class UpdateNodeOperation(NodeOperation): id = 'update_node' name = _("update node") description = _("Upgrade or install node software (vmdriver, agentdriver, " "monitor-client) with Salt.") required_perms = () online_required = False async_queue = "localhost.man.slow" def minion_cmd(self, module, params, timeout=3600): # see https://git.ik.bme.hu/circle/cloud/issues/377 from salt.client import LocalClient name = self.node.host.hostname client = LocalClient() data = client.cmd( name, module, params, timeout=timeout) try: data = data[name] except KeyError: raise HumanReadableException.create(ugettext_noop( "No minions matched the target.")) if not isinstance(data, dict): raise HumanReadableException.create(ugettext_noop( "Unhandled exception: %(msg)s"), msg=unicode(data)) return data def _operation(self, activity): with activity.sub_activity( 'upgrade_packages', readable_name=ugettext_noop('upgrade packages')) as sa: data = self.minion_cmd('pkg.upgrade', []) upgraded = len(filter(lambda x: x['old'] and x['new'], data.values())) installed = len(filter(lambda x: not x['old'] and x['new'], data.values())) removed = len(filter(lambda x: x['old'] and not x['new'], data.values())) sa.result = create_readable(ugettext_noop( "Upgraded: %(upgraded)s, Installed: %(installed)s, " "Removed: %(removed)s"), upgraded=upgraded, installed=installed, removed=removed) data = self.minion_cmd('state.sls', ['node', 'nfs-client']) failed = 0 for k, v in data.iteritems(): logger.debug('salt state %s %s', k, v) act_name = ': '.join(k.split('_|-')[:2]) if not v["result"] or v["changes"]: act = activity.create_sub( act_name[:70], readable_name=act_name) act.result = create_readable(ugettext_noop( "Changes: %(changes)s Comment: %(comment)s"), changes=v["changes"], comment=v["comment"]) act.finish(v["result"]) if not v["result"]: failed += 1 if failed: raise HumanReadableException.create(ugettext_noop( "Failed: %(failed)s"), failed=failed) @register_operation class ScreenshotOperation(RemoteInstanceOperation): id = 'screenshot' name = _("screenshot") description = _("Get a screenshot about the virtual machine's console. A " "key will be pressed on the keyboard to stop " "screensaver.") acl_level = "owner" required_perms = () accept_states = ('RUNNING', ) task = vm_tasks.screenshot @register_operation class RecoverOperation(InstanceOperation): id = 'recover' name = _("recover") description = _("Try to recover virtual machine disks from destroyed " "state. Network resources (allocations) are already lost, " "so you will have to manually add interfaces afterwards.") acl_level = "owner" required_perms = ('vm.recover', ) accept_states = ('DESTROYED', ) resultant_state = 'PENDING' def check_precond(self): try: super(RecoverOperation, self).check_precond() except Instance.InstanceDestroyedError: pass def _operation(self, user, activity): with activity.sub_activity( 'recover_instance', readable_name=ugettext_noop("recover instance")): self.instance.destroyed_at = None for disk in self.instance.disks.all(): disk.destroyed = None disk.restore() disk.save() self.instance.status = 'PENDING' self.instance.save() try: self.instance.renew(parent_activity=activity) except: pass if self.instance.template: for net in self.instance.template.interface_set.all(): self.instance.add_interface( parent_activity=activity, user=user, vlan=net.vlan) @register_operation class ResourcesOperation(InstanceOperation): id = 'resources_change' name = _("resources change") description = _("Change resources of a stopped virtual machine.") acl_level = "owner" required_perms = ('vm.change_resources', ) accept_states = ('STOPPED', 'PENDING', ) def _operation(self, user, activity, num_cores, ram_size, max_ram_size, priority): self.instance.num_cores = num_cores self.instance.ram_size = ram_size self.instance.max_ram_size = max_ram_size self.instance.priority = priority self.instance.full_clean() self.instance.save() activity.result = create_readable(ugettext_noop( "Priority: %(priority)s, Num cores: %(num_cores)s, " "Ram size: %(ram_size)s"), priority=priority, num_cores=num_cores, ram_size=ram_size ) @register_operation class PasswordResetOperation(RemoteAgentOperation): id = 'password_reset' name = _("password reset") description = _("Generate and set a new login password on the virtual " "machine. This operation requires the agent running. " "Resetting the password is not warranted to allow you " "logging in as other settings are possible to prevent " "it.") acl_level = "owner" task = agent_tasks.change_password required_perms = () def _get_remote_args(self, password, **kwargs): return (super(PasswordResetOperation, self)._get_remote_args(**kwargs) + [password]) def _operation(self, password=None): if not password: password = pwgen() super(PasswordResetOperation, self)._operation(password=password) self.instance.pw = password self.instance.save() @register_operation class AgentStartedOperation(InstanceOperation): id = 'agent_started' name = _("agent") acl_level = "owner" required_perms = () concurrency_check = False @classmethod def get_activity_code_suffix(cls): return 'agent' @property def initialized(self): return self.instance.activity_log.filter( activity_code='vm.Instance.agent._cleanup').exists() def measure_boot_time(self): if not self.instance.template: return deploy_time = InstanceActivity.objects.filter( instance=self.instance, activity_code="vm.Instance.deploy" ).latest("finished").finished total_boot_time = (timezone.now() - deploy_time).total_seconds() Client().send([ "template.%(pk)d.boot_time %(val)f %(time)s" % { 'pk': self.instance.template.pk, 'val': total_boot_time, 'time': time.time(), } ]) def finish_agent_wait(self): for i in InstanceActivity.objects.filter( (Q(activity_code__endswith='.os_boot') | Q(activity_code__endswith='.agent_wait')), instance=self.instance, finished__isnull=True): i.finish(True) def _operation(self, user, activity, old_version=None, agent_system=None): with activity.sub_activity('starting', concurrency_check=False, readable_name=ugettext_noop('starting')): pass self.finish_agent_wait() self.instance._change_ip(parent_activity=activity) self.instance._restart_networking(parent_activity=activity) new_version = settings.AGENT_VERSION if new_version and old_version and new_version != old_version: try: self.instance.update_agent( parent_activity=activity, agent_system=agent_system) except TimeoutError: pass else: activity.sub_activity( 'agent_wait', readable_name=ugettext_noop( "wait agent restarting"), interruptible=True) return # agent is going to restart if not self.initialized: try: self.measure_boot_time() except: logger.exception('Unhandled error in measure_boot_time()') self.instance._cleanup(parent_activity=activity) self.instance.password_reset( parent_activity=activity, password=self.instance.pw) self.instance._set_time(parent_activity=activity) self.instance._set_hostname(parent_activity=activity) @register_operation class CleanupOperation(SubOperationMixin, RemoteAgentOperation): id = '_cleanup' name = _("cleanup") task = agent_tasks.cleanup @register_operation class SetTimeOperation(SubOperationMixin, RemoteAgentOperation): id = '_set_time' name = _("set time") task = agent_tasks.set_time def _get_remote_args(self, **kwargs): cls = AgentStartedOperation.SetTimeOperation return (super(cls, self)._get_remote_args(**kwargs) + [time.time()]) @register_operation class SetHostnameOperation(SubOperationMixin, RemoteAgentOperation): id = '_set_hostname' name = _("set hostname") task = agent_tasks.set_hostname def _get_remote_args(self, **kwargs): cls = AgentStartedOperation.SetHostnameOperation return (super(cls, self)._get_remote_args(**kwargs) + [self.instance.short_hostname]) @register_operation class RestartNetworkingOperation(SubOperationMixin, RemoteAgentOperation): id = '_restart_networking' name = _("restart networking") task = agent_tasks.restart_networking @register_operation class ChangeIpOperation(SubOperationMixin, RemoteAgentOperation): id = '_change_ip' name = _("change ip") task = agent_tasks.change_ip def _get_remote_args(self, **kwargs): hosts = Host.objects.filter(interface__instance=self.instance) interfaces = {str(host.mac): host.get_network_config() for host in hosts} cls = AgentStartedOperation.ChangeIpOperation return (super(cls, self)._get_remote_args(**kwargs) + [interfaces, settings.FIREWALL_SETTINGS['rdns_ip']]) @register_operation class UpdateAgentOperation(RemoteAgentOperation): id = 'update_agent' name = _("update agent") acl_level = "owner" required_perms = () def get_activity_name(self, kwargs): return create_readable( ugettext_noop('update agent to %(version)s'), version=settings.AGENT_VERSION) @staticmethod def create_linux_tar(): def exclude(tarinfo): ignored = ('./.', './misc', './windows') if any(tarinfo.name.startswith(x) for x in ignored): return None else: return tarinfo f = StringIO() with TarFile.open(fileobj=f, mode='w:gz') as tar: agent_path = os.path.join(settings.AGENT_DIR, "agent-linux") tar.add(agent_path, arcname='.', filter=exclude) version_fileobj = StringIO(settings.AGENT_VERSION) version_info = TarInfo(name='version.txt') version_info.size = len(version_fileobj.buf) tar.addfile(version_info, version_fileobj) return encodestring(f.getvalue()).replace('\n', '') @staticmethod def create_windows_tar(): f = StringIO() agent_path = os.path.join(settings.AGENT_DIR, "agent-win") with TarFile.open(fileobj=f, mode='w|gz') as tar: tar.add(agent_path, arcname='.') version_fileobj = StringIO(settings.AGENT_VERSION) version_info = TarInfo(name='version.txt') version_info.size = len(version_fileobj.buf) tar.addfile(version_info, version_fileobj) return encodestring(f.getvalue()).replace('\n', '') def _operation(self, user, activity, agent_system): queue = self._get_remote_queue() instance = self.instance if agent_system == "Windows": executable = os.listdir( os.path.join(settings.AGENT_DIR, "agent-win"))[0] data = self.create_windows_tar() elif agent_system == "Linux": executable = "" data = self.create_linux_tar() else: # Legacy update method executable = "" return agent_tasks.update_legacy.apply_async( queue=queue, args=(instance.vm_name, self.create_linux_tar()) ).get(timeout=60) checksum = md5(data).hexdigest() chunk_size = 1024 * 1024 chunk_number = 0 index = 0 filename = settings.AGENT_VERSION + ".tar" while True: chunk = data[index:index+chunk_size] if chunk: agent_tasks.append.apply_async( queue=queue, args=(instance.vm_name, chunk, filename, chunk_number)).get(timeout=60) index = index + chunk_size chunk_number = chunk_number + 1 else: agent_tasks.update.apply_async( queue=queue, args=(instance.vm_name, filename, executable, checksum) ).get(timeout=60) break @register_operation class MountStoreOperation(EnsureAgentMixin, InstanceOperation): id = 'mount_store' name = _("mount store") description = _( "This operation attaches your personal file store. Other users who " "have access to this machine can see these files as well." ) acl_level = "owner" required_perms = () def check_auth(self, user): super(MountStoreOperation, self).check_auth(user) try: Store(user) except NoStoreException: raise PermissionDenied # not show the button at all def _operation(self, user): inst = self.instance queue = self.instance.get_remote_queue_name("agent") host = urlsplit(settings.STORE_URL).hostname username = Store(user).username password = user.profile.smb_password agent_tasks.mount_store.apply_async( queue=queue, args=(inst.vm_name, host, username, password)) class AbstractDiskOperation(SubOperationMixin, RemoteInstanceOperation): required_perms = () def _get_remote_args(self, disk, **kwargs): return (super(AbstractDiskOperation, self)._get_remote_args(**kwargs) + [disk.get_vmdisk_desc()]) @register_operation class AttachDisk(AbstractDiskOperation): id = "_attach_disk" name = _("attach disk") task = vm_tasks.attach_disk class DetachMixin(object): def _operation(self, activity, **kwargs): try: super(DetachMixin, self)._operation(**kwargs) except Exception as e: if hasattr(e, "libvirtError") and "not found" in unicode(e): activity.result = create_readable( ugettext_noop("Resource was not found."), ugettext_noop("Resource was not found. %(exception)s"), exception=unicode(e)) else: raise @register_operation class DetachDisk(DetachMixin, AbstractDiskOperation): id = "_detach_disk" name = _("detach disk") task = vm_tasks.detach_disk class AbstractNetworkOperation(SubOperationMixin, RemoteInstanceOperation): required_perms = () def _get_remote_args(self, interface, **kwargs): return (super(AbstractNetworkOperation, self) ._get_remote_args(**kwargs) + [interface.get_vmnetwork_desc()]) @register_operation class AttachNetwork(AbstractNetworkOperation): id = "_attach_network" name = _("attach network") task = vm_tasks.attach_network @register_operation class DetachNetwork(DetachMixin, AbstractNetworkOperation): id = "_detach_network" name = _("detach network") task = vm_tasks.detach_network