operations.py 14.1 KB
Newer Older
1
from __future__ import absolute_import, unicode_literals
Dudás Ádám committed
2 3 4
from logging import getLogger
from string import ascii_lowercase

5
from django.core.exceptions import PermissionDenied
Dudás Ádám committed
6 7 8 9
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from celery.exceptions import TimeLimitExceeded
10

11
from common.operations import Operation, register_operation
Dudás Ádám committed
12
from storage.models import Disk
13
from .tasks import vm_tasks
14 15 16 17
from .tasks.local_tasks import async_instance_operation, async_node_operation
from .models import (
    Instance, InstanceActivity, InstanceTemplate, Node, NodeActivity,
)
Dudás Ádám committed
18 19 20


logger = getLogger(__name__)
21 22


23
class InstanceOperation(Operation):
24
    acl_level = 'owner'
25
    async_operation = async_instance_operation
Dudás Ádám committed
26

27
    def __init__(self, instance):
28
        super(InstanceOperation, self).__init__(subject=instance)
29 30 31
        self.instance = instance

    def check_precond(self):
32 33
        if self.instance.destroyed_at:
            raise self.instance.InstanceDestroyedError(self.instance)
34 35

    def check_auth(self, user):
36 37 38 39
        if not self.instance.has_level(user, self.acl_level):
            raise PermissionDenied("%s doesn't have the required ACL level." %
                                   user)

40
        super(InstanceOperation, self).check_auth(user=user)
41

42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
    def create_activity(self, parent, user):
        if parent:
            if parent.instance != self.instance:
                raise ValueError("The instance associated with the specified "
                                 "parent activity does not match the instance "
                                 "bound to the operation.")
            if parent.user != user:
                raise ValueError("The user associated with the specified "
                                 "parent activity does not match the user "
                                 "provided as parameter.")

            return parent.create_sub(code_suffix=self.activity_code_suffix)
        else:
            return InstanceActivity.create(
                code_suffix=self.activity_code_suffix, instance=self.instance,
                user=user)
58

59

60 61
def register_instance_operation(op_cls, op_id=None):
    return register_operation(Instance, op_cls, op_id)
Dudás Ádám committed
62 63


64
class DeployOperation(InstanceOperation):
Dudás Ádám committed
65 66 67
    activity_code_suffix = 'deploy'
    id = 'deploy'
    name = _("deploy")
68
    description = _("Deploy new virtual machine with network.")
Dudás Ádám committed
69 70 71 72

    def on_commit(self, activity):
        activity.resultant_state = 'RUNNING'

73
    def _operation(self, activity, user, system):
Dudás Ádám committed
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
        self.instance._schedule_vm(activity)

        # Deploy virtual images
        with activity.sub_activity('deploying_disks'):
            devnums = list(ascii_lowercase)  # a-z
            for disk in self.instance.disks.all():
                # assign device numbers
                if disk.dev_num in devnums:
                    devnums.remove(disk.dev_num)
                else:
                    disk.dev_num = devnums.pop(0)
                    disk.save()
                # deploy disk
                disk.deploy()

        self.instance._deploy_vm(activity)


92
register_instance_operation(DeployOperation)
Dudás Ádám committed
93 94


95
class DestroyOperation(InstanceOperation):
Dudás Ádám committed
96 97 98
    activity_code_suffix = 'destroy'
    id = 'destroy'
    name = _("destroy")
99
    description = _("Destroy virtual machine and its networks.")
Dudás Ádám committed
100 101 102 103

    def on_commit(self, activity):
        activity.resultant_state = 'DESTROYED'

104
    def _operation(self, activity, user, system):
Dudás Ádám committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118
        if self.instance.node:
            self.instance._destroy_vm(activity)

        # Destroy disks
        with activity.sub_activity('destroying_disks'):
            for disk in self.instance.disks.all():
                disk.destroy()

        self.instance._cleanup_after_destroy_vm(activity)

        self.instance.destroyed_at = timezone.now()
        self.instance.save()


119
register_instance_operation(DestroyOperation)
Dudás Ádám committed
120 121


122
class MigrateOperation(InstanceOperation):
Dudás Ádám committed
123 124 125
    activity_code_suffix = 'migrate'
    id = 'migrate'
    name = _("migrate")
126
    description = _("Live migrate running VM to another node.")
Dudás Ádám committed
127

128
    def _operation(self, activity, user, system, to_node=None, timeout=120):
Dudás Ádám committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        if not to_node:
            with activity.sub_activity('scheduling') as sa:
                to_node = self.instance.select_node()
                sa.result = to_node

        # Destroy networks
        with activity.sub_activity('destroying_net'):
            for net in self.instance.interface_set.all():
                net.shutdown()

        with activity.sub_activity('migrate_vm'):
            queue_name = self.instance.get_remote_queue_name('vm')
            vm_tasks.migrate.apply_async(args=[self.instance.vm_name,
                                               to_node.host.hostname],
                                         queue=queue_name).get(timeout=timeout)
        # Refresh node information
        self.instance.node = to_node
        self.instance.save()
        # Estabilish network connection (vmdriver)
        with activity.sub_activity('deploying_net'):
            for net in self.instance.interface_set.all():
                net.deploy()


153
register_instance_operation(MigrateOperation)
Dudás Ádám committed
154 155


156
class RebootOperation(InstanceOperation):
Dudás Ádám committed
157 158 159
    activity_code_suffix = 'reboot'
    id = 'reboot'
    name = _("reboot")
160
    description = _("Reboot virtual machine with Ctrl+Alt+Del signal.")
Dudás Ádám committed
161

162
    def _operation(self, activity, user, system, timeout=5):
Dudás Ádám committed
163 164 165 166 167
        queue_name = self.instance.get_remote_queue_name('vm')
        vm_tasks.reboot.apply_async(args=[self.instance.vm_name],
                                    queue=queue_name).get(timeout=timeout)


168
register_instance_operation(RebootOperation)
Dudás Ádám committed
169 170


171
class ResetOperation(InstanceOperation):
Dudás Ádám committed
172 173 174
    activity_code_suffix = 'reset'
    id = 'reset'
    name = _("reset")
175
    description = _("Reset virtual machine (reset button).")
Dudás Ádám committed
176

177
    def _operation(self, activity, user, system, timeout=5):
Dudás Ádám committed
178 179 180 181 182
        queue_name = self.instance.get_remote_queue_name('vm')
        vm_tasks.reset.apply_async(args=[self.instance.vm_name],
                                   queue=queue_name).get(timeout=timeout)


183
register_instance_operation(ResetOperation)
Dudás Ádám committed
184 185


186
class SaveAsTemplateOperation(InstanceOperation):
Dudás Ádám committed
187 188 189 190 191 192 193 194 195
    activity_code_suffix = 'save_as_template'
    id = 'save_as_template'
    name = _("save as template")
    description = _("""Save Virtual Machine as a Template.

        Template can be shared with groups and users.
        Users can instantiate Virtual Machines from Templates.
        """)

196 197 198 199 200
    def _operation(self, activity, name, user, system, timeout=300,
                   with_shutdown=True, **kwargs):
        if with_shutdown:
            ShutdownOperation(self.instance).call(parent_activity=activity,
                                                  user=user)
Dudás Ádám committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
        # prepare parameters
        params = {
            'access_method': self.instance.access_method,
            'arch': self.instance.arch,
            'boot_menu': self.instance.boot_menu,
            'description': self.instance.description,
            'lease': self.instance.lease,  # Can be problem in new VM
            'max_ram_size': self.instance.max_ram_size,
            'name': name,
            'num_cores': self.instance.num_cores,
            'owner': user,
            'parent': self.instance.template,  # Can be problem
            'priority': self.instance.priority,
            'ram_size': self.instance.ram_size,
            'raw_data': self.instance.raw_data,
            'system': self.instance.system,
        }
        params.update(kwargs)

        def __try_save_disk(disk):
            try:
                return disk.save_as()  # can do in parallel
            except Disk.WrongDiskTypeError:
                return disk

        # create template and do additional setup
        tmpl = InstanceTemplate(**params)
        tmpl.full_clean()  # Avoiding database errors.
        tmpl.save()
        try:
            with activity.sub_activity('saving_disks'):
                tmpl.disks.add(*[__try_save_disk(disk)
                                 for disk in self.instance.disks.all()])
            # create interface templates
            for i in self.instance.interface_set.all():
                i.save_as_template(tmpl)
        except:
            tmpl.delete()
            raise
        else:
            return tmpl


244
register_instance_operation(SaveAsTemplateOperation)
Dudás Ádám committed
245 246


247
class ShutdownOperation(InstanceOperation):
Dudás Ádám committed
248 249 250
    activity_code_suffix = 'shutdown'
    id = 'shutdown'
    name = _("shutdown")
251
    description = _("Shutdown virtual machine with ACPI signal.")
Dudás Ádám committed
252 253 254 255 256 257 258 259 260 261

    def on_abort(self, activity, error):
        if isinstance(error, TimeLimitExceeded):
            activity.resultant_state = None
        else:
            activity.resultant_state = 'ERROR'

    def on_commit(self, activity):
        activity.resultant_state = 'STOPPED'

262
    def _operation(self, activity, user, system, timeout=120):
Dudás Ádám committed
263
        queue_name = self.instance.get_remote_queue_name('vm')
264 265
        logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name,
                     self.instance.vm_name)
Dudás Ádám committed
266 267 268 269 270 271 272
        vm_tasks.shutdown.apply_async(kwargs={'name': self.instance.vm_name},
                                      queue=queue_name).get(timeout=timeout)
        self.instance.node = None
        self.instance.vnc_port = None
        self.instance.save()


273
register_instance_operation(ShutdownOperation)
Dudás Ádám committed
274 275


276
class ShutOffOperation(InstanceOperation):
Dudás Ádám committed
277 278 279
    activity_code_suffix = 'shut_off'
    id = 'shut_off'
    name = _("shut off")
280
    description = _("Shut off VM (plug-out).")
Dudás Ádám committed
281 282 283 284

    def on_commit(activity):
        activity.resultant_state = 'STOPPED'

285
    def _operation(self, activity, user, system):
Dudás Ádám committed
286 287 288 289 290 291 292 293
        # Destroy VM
        if self.instance.node:
            self.instance._destroy_vm(activity)

        self.instance._cleanup_after_destroy_vm(activity)
        self.instance.save()


294
register_instance_operation(ShutOffOperation)
Dudás Ádám committed
295 296


297
class SleepOperation(InstanceOperation):
Dudás Ádám committed
298 299 300
    activity_code_suffix = 'sleep'
    id = 'sleep'
    name = _("sleep")
301
    description = _("Suspend virtual machine with memory dump.")
Dudás Ádám committed
302 303

    def check_precond(self):
304
        super(SleepOperation, self).check_precond()
Dudás Ádám committed
305 306 307 308 309 310 311 312 313 314 315 316
        if self.instance.status not in ['RUNNING']:
            raise self.instance.WrongStateError(self.instance)

    def on_abort(self, activity, error):
        if isinstance(error, TimeLimitExceeded):
            activity.resultant_state = None
        else:
            activity.resultant_state = 'ERROR'

    def on_commit(self, activity):
        activity.resultant_state = 'SUSPENDED'

317
    def _operation(self, activity, user, system, timeout=60):
Dudás Ádám committed
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
        # Destroy networks
        with activity.sub_activity('destroying_net'):
            for net in self.instance.interface_set.all():
                net.shutdown()

        # Suspend vm
        with activity.sub_activity('suspending'):
            queue_name = self.instance.get_remote_queue_name('vm')
            vm_tasks.sleep.apply_async(args=[self.instance.vm_name,
                                             self.instance.mem_dump['path']],
                                       queue=queue_name).get(timeout=timeout)
            self.instance.node = None
            self.instance.save()


333
register_instance_operation(SleepOperation)
Dudás Ádám committed
334 335


336
class WakeUpOperation(InstanceOperation):
Dudás Ádám committed
337 338 339 340 341 342 343 344 345
    activity_code_suffix = 'wake_up'
    id = 'wake_up'
    name = _("wake up")
    description = _("""Wake up Virtual Machine from SUSPENDED state.

        Power on Virtual Machine and load its memory from dump.
        """)

    def check_precond(self):
346
        super(WakeUpOperation, self).check_precond()
Dudás Ádám committed
347 348 349 350 351 352 353 354 355
        if self.instance.status not in ['SUSPENDED']:
            raise self.instance.WrongStateError(self.instance)

    def on_abort(self, activity, error):
        activity.resultant_state = 'ERROR'

    def on_commit(self, activity):
        activity.resultant_state = 'RUNNING'

356
    def _operation(self, activity, user, system, timeout=60):
Dudás Ádám committed
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        # Schedule vm
        self.instance._schedule_vm(activity)
        queue_name = self.instance.get_remote_queue_name('vm')

        # Resume vm
        with activity.sub_activity('resuming'):
            vm_tasks.wake_up.apply_async(args=[self.instance.vm_name,
                                               self.instance.mem_dump['path']],
                                         queue=queue_name).get(timeout=timeout)

        # Estabilish network connection (vmdriver)
        with activity.sub_activity('deploying_net'):
            for net in self.instance.interface_set.all():
                net.deploy()

        # Renew vm
        self.instance.renew(which='both', base_activity=activity)


376
register_instance_operation(WakeUpOperation)
377 378 379 380 381 382 383 384 385


class NodeOperation(Operation):
    async_operation = async_node_operation

    def __init__(self, node):
        super(NodeOperation, self).__init__(subject=node)
        self.node = node

386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
    def create_activity(self, parent, user):
        if parent:
            if parent.node != self.node:
                raise ValueError("The node associated with the specified "
                                 "parent activity does not match the node "
                                 "bound to the operation.")
            if parent.user != user:
                raise ValueError("The user associated with the specified "
                                 "parent activity does not match the user "
                                 "provided as parameter.")

            return parent.create_sub(code_suffix=self.activity_code_suffix)
        else:
            return NodeActivity.create(code_suffix=self.activity_code_suffix,
                                       node=self.node, user=user)
401 402 403 404 405 406 407 408 409 410


def register_node_operation(op_cls, op_id=None):
    return register_operation(Node, op_cls, op_id)


class FlushOperation(NodeOperation):
    activity_code_suffix = 'flush'
    id = 'flush'
    name = _("flush")
411
    description = _("Disable node and move all instances to other ones.")
412 413 414 415 416 417 418 419 420

    def _operation(self, activity, user, system):
        self.node.disable(user, activity)
        for i in self.node.instance_set.all():
            with activity.sub_activity('migrate_instance_%d' % i.pk):
                i.migrate()


register_node_operation(FlushOperation)