models.py 20.4 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

20
from __future__ import unicode_literals
21 22

import logging
23
from os.path import join
24
import uuid
25
import re
26

Guba Sándor committed
27 28
from celery.contrib.abortable import AbortableAsyncResult
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
29
                              ForeignKey)
30
from django.core.exceptions import ObjectDoesNotExist
31
from django.core.urlresolvers import reverse
32
from django.utils import timezone
33
from django.utils.translation import ugettext_lazy as _, ugettext_noop
34
from model_utils.models import TimeStampedModel
35
from sizefield.models import FileSizeField
36

Guba Sándor committed
37
from .tasks import local_tasks, storage_tasks
38
from celery.exceptions import TimeoutError
39
from common.models import (
40
    WorkerNotFound, HumanReadableException, humanize_exception, method_cache
41
)
42 43 44 45

logger = logging.getLogger(__name__)


46
class DataStore(Model):
Guba Sándor committed
47

48 49
    """Collection of virtual disks.
    """
50 51 52 53
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
54

55 56 57 58 59 60 61 62
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

63 64
    def get_remote_queue_name(self, queue_id, priority=None,
                              check_worker=True):
65 66
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
67
        if not check_worker or local_tasks.check_queue(self.hostname,
Guba Sándor committed
68 69 70 71 72 73
                                                       queue_id,
                                                       priority):
            queue_name = self.hostname + '.' + queue_id
            if priority is not None:
                queue_name = queue_name + '.' + priority
            return queue_name
74 75
        else:
            raise WorkerNotFound()
76

77 78 79
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
80
                    destroyed__isnull=False) if disk.is_deletable]
81

82 83 84 85 86 87 88
    @method_cache(30)
    def get_statistics(self, timeout=15):
        q = self.get_remote_queue_name("storage", priority="fast")
        return storage_tasks.get_storage_stat.apply_async(
            args=[self.path], queue=q).get(timeout=timeout)

    @method_cache(30)
89
    def get_orphan_disks(self, timeout=25):
90 91 92 93 94 95 96 97 98 99 100 101 102 103
        """Disk image files without Disk object in the database.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = set([disk.filename for disk in self.disk_set.all()])

        orphans = []
        for i in files - disks:
            if not re.match('cloud-[0-9]*\.dump', i):
                orphans.append(i)
        return orphans

    @method_cache(30)
104
    def get_missing_disks(self, timeout=25):
105 106 107 108 109 110 111 112
        """Disk objects without disk image files.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = Disk.objects.filter(destroyed__isnull=True, is_ready=True)
        return disks.exclude(filename__in=files)

113
    @method_cache(120)
114
    def get_file_statistics(self, timeout=90):
115 116 117 118 119
        queue_name = self.get_remote_queue_name('storage', "slow")
        data = storage_tasks.get_file_statistics.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout)
        return data

120

Bach Dániel committed
121
class Disk(TimeStampedModel):
Guba Sándor committed
122

123 124 125 126
    """A virtual disk.
    """
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
127
    BUS_TYPES = (('virtio', 'virtio'), ('ide', 'ide'), ('scsi', 'scsi'))
128 129 130 131
    EXPORT_FORMATS = (('vmdk', _('VMware disk image')),
                      ('qcow2', _('QEMU disk image')),
                      ('vdi', _('VirtualBox disk image')),
                      ('vpc', _('HyperV disk image')))
132
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
133 134
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
135 136 137
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
138 139
    bus = CharField(max_length=10, choices=BUS_TYPES, null=True, blank=True,
                    default=None)
140
    size = FileSizeField(null=True, default=None)
141 142 143
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
144
                        verbose_name=_("device number"))
145
    destroyed = DateTimeField(blank=True, default=None, null=True)
146

Guba Sándor committed
147 148
    is_ready = BooleanField(default=False)

149 150 151 152
    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')
153 154
        permissions = (
            ('create_empty_disk', _('Can create an empty disk.')),
155
            ('download_disk', _('Can download a disk.')),
156 157 158
            ('resize_disk', _('Can resize a disk.')),
            ('import_disk', _('Can import a disk.')),
            ('export_disk', _('Can export a disk.'))
159
        )
160

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
    class DiskError(HumanReadableException):
        admin_message = None

        def __init__(self, disk, params=None, level=None, **kwargs):
            kwargs.update(params or {})
            self.disc = kwargs["disk"] = disk
            super(Disk.DiskError, self).__init__(
                level, self.message, self.admin_message or self.message,
                kwargs)

    class WrongDiskTypeError(DiskError):
        message = ugettext_noop("Operation can't be invoked on disk "
                                "'%(name)s' of type '%(type)s'.")

        admin_message = ugettext_noop(
            "Operation can't be invoked on disk "
            "'%(name)s' (%(pk)s) of type '%(type)s'.")

        def __init__(self, disk, params=None, **kwargs):
            super(Disk.WrongDiskTypeError, self).__init__(
                disk, params, type=disk.type, name=disk.name, pk=disk.pk)

    class DiskInUseError(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it is in use.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) because it is in use.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
193
            super(Disk.DiskInUseError, self).__init__(
194 195 196 197 198 199 200 201 202 203 204 205 206
                disk, params, name=disk.name, pk=disk.pk)

    class DiskIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because it has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
207
            super(Disk.DiskIsNotReady, self).__init__(
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename)

    class DiskBaseIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because its base has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because its base "
            "'%(b_name)s' (%(b_pk)s) [%(b_filename)s] has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
223
            base = kwargs.get('base')
Guba Sándor committed
224
            super(Disk.DiskBaseIsNotReady, self).__init__(
225 226 227
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename, b_name=base.name,
                b_pk=base.pk, b_filename=base.filename)
Guba Sándor committed
228

229 230
    @property
    def path(self):
231 232
        """The path where the files are stored.
        """
233
        return join(self.datastore.path, self.filename)
234 235

    @property
236
    def vm_format(self):
237 238
        """Returns the proper file format for different type of images.
        """
239 240 241
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
242
            'iso': 'raw',
243 244 245 246
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

247
    @property
248
    def format(self):
249 250
        """Returns the proper file format for different types of images.
        """
251 252 253 254 255 256 257 258 259
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
260
    def device_type(self):
261 262
        """Returns the proper device prefix for different types of images.
        """
263
        return {
264 265
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
266
            'iso': 'sd',
267 268 269
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
270

271
    @property
272 273 274
    def device_bus(self):
        """Returns the proper device prefix for different types of images.
        """
275 276
        if self.bus:
            return self.bus
277 278 279
        return {
            'qcow2-norm': 'virtio',
            'qcow2-snap': 'virtio',
280
            'iso': 'ide',
281 282 283 284 285
            'raw-ro': 'virtio',
            'raw-rw': 'virtio',
        }[self.type]

    @property
286
    def is_deletable(self):
287
        """True if the associated file can be deleted.
288
        """
289
        # Check if all children and the disk itself is destroyed.
290
        return (self.destroyed is not None) and self.children_deletable
291

292 293 294
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
295
        """
296
        return all(i.is_deletable for i in self.derivatives.all())
297

298
    @property
299
    def is_in_use(self):
300
        """True if disk is attached to an active VM.
301 302 303 304

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
305
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
306

307
    def get_appliance(self):
Bach Dániel committed
308 309
        """Return the Instance or InstanceTemplate object where the disk
        is used
310
        """
Bach Dániel committed
311
        try:
312 313 314 315
            app = self.template_set.all() or self.instance_set.all()
            return app.get()
        except ObjectDoesNotExist:
            return None
316

317 318
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
319

320 321 322
        This method manipulates the database only.
        """
        type_mapping = {
323 324 325
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
326 327 328
        }

        if self.type not in type_mapping.keys():
329
            raise self.WrongDiskTypeError(self)
330 331

        new_type = type_mapping[self.type]
332

333 334
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
335
                           type=new_type, dev_num=self.dev_num)
336 337

    def get_vmdisk_desc(self):
338 339
        """Serialize disk object to the vmdriver.
        """
340
        return {
341
            'source': self.path,
342
            'driver_type': self.vm_format,
343
            'driver_cache': 'none',
344
            'target_device': self.device_type + self.dev_num,
345
            'target_bus': self.device_bus,
346
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
347 348
        }

349
    def get_disk_desc(self):
350 351
        """Serialize disk object to the storage driver.
        """
352 353 354 355 356 357
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
358
            'type': 'snapshot' if self.base else 'normal'
359 360
        }

361 362
    def get_remote_queue_name(self, queue_id='storage', priority=None,
                              check_worker=True):
363 364
        """Returns the proper queue name based on the datastore.
        """
365
        if self.datastore:
366 367
            return self.datastore.get_remote_queue_name(queue_id, priority,
                                                        check_worker)
368 369 370
        else:
            return None

371
    def __unicode__(self):
372
        return u"%s (#%d)" % (self.name, self.id or 0)
373

374
    def clean(self, *args, **kwargs):
Guba Sándor committed
375
        if (self.size is None or "") and self.base:
376 377 378
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

379
    def deploy(self, user=None, task_uuid=None, timeout=15):
380 381 382 383 384
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

385 386 387 388 389 390 391
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

392 393 394 395
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
396 397 398 399
        if self.destroyed:
            self.destroyed = None
            self.save()

400
        if self.is_ready:
401
            return True
402
        if self.base and not self.base.is_ready:
403
            raise self.DiskBaseIsNotReady(self, base=self.base)
Guba Sándor committed
404 405 406 407 408 409 410 411 412 413
        queue_name = self.get_remote_queue_name('storage', priority="fast")
        disk_desc = self.get_disk_desc()
        if self.base is not None:
            storage_tasks.snapshot.apply_async(args=[disk_desc],
                                               queue=queue_name
                                               ).get(timeout=timeout)
        else:
            storage_tasks.create.apply_async(args=[disk_desc],
                                             queue=queue_name
                                             ).get(timeout=timeout)
414

415 416
        self.is_ready = True
        self.save()
Guba Sándor committed
417
        return True
418

419
    @classmethod
Guba Sándor committed
420 421
    def create(cls, user=None, **params):
        disk = cls.__create(user, params)
Guba Sándor committed
422
        disk.clean()
423
        disk.save()
424 425
        logger.debug(u"Disk created from: %s",
                     unicode(params.get("base", "nobase")))
426
        return disk
427

428
    @classmethod
Guba Sándor committed
429 430 431 432
    def __create(cls, user, params):
        datastore = params.pop('datastore', DataStore.objects.get())
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
433
        return disk
434 435

    @classmethod
Guba Sándor committed
436
    def download(cls, url, task, user=None, **params):
437 438 439 440
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
441 442
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
443 444
        :param user: owner of the disk
        :type user: django.contrib.auth.User
445 446
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
447

448 449
        :return: The created Disk object
        :rtype: Disk
450
        """
Guba Sándor committed
451
        params.setdefault('name', url.split('/')[-1])
452 453 454
        params.setdefault('type', 'iso')
        params.setdefault('size', None)
        disk = cls.__create(params=params, user=user)
Guba Sándor committed
455 456
        queue_name = disk.get_remote_queue_name('storage', priority='slow')
        remote = storage_tasks.download.apply_async(
457
            kwargs={'url': url, 'parent_id': task.request.id,
Guba Sándor committed
458 459 460 461
                    'disk': disk.get_disk_desc()},
            queue=queue_name)
        while True:
            try:
462
                result = remote.get(timeout=5)
Guba Sándor committed
463
                break
464
            except TimeoutError as e:
Guba Sándor committed
465 466
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
467 468
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
469 470
        disk.size = result['size']
        disk.type = result['type']
471
        disk.checksum = result.get('checksum', None)
472
        disk.is_ready = True
Guba Sándor committed
473
        disk.save()
474
        return disk
475

476 477 478
    def export(self, format, upload_link, timeout=600):
        exported_name = self.name if self.name != '' else self.filename

479 480
        queue_name = self.get_remote_queue_name('storage', priority='slow')
        storage_tasks.export.apply_async(
481
            args=[self.get_disk_desc(), format, exported_name, upload_link],
482
            queue=queue_name).get(timeout=timeout)
483

484
    def destroy(self, user=None, task_uuid=None):
485 486 487
        if self.destroyed:
            return False

Guba Sándor committed
488 489 490
        self.destroyed = timezone.now()
        self.save()
        return True
491

492
    def restore(self, user=None, task_uuid=None, timeout=15):
493
        """Recover destroyed disk from trash if possible.
494
        """
495 496 497 498 499 500 501
        queue_name = self.datastore.get_remote_queue_name(
            'storage', priority='slow')
        logger.info("Image: %s at Datastore: %s recovered from trash." %
                    (self.filename, self.datastore.path))
        storage_tasks.recover_from_trash.apply_async(
            args=[self.datastore.path, self.filename],
            queue=queue_name).get(timeout=timeout)
502

503
    def save_as(self, task=None, user=None, task_uuid=None, timeout=300):
504 505
        """Save VM as template.

506 507 508 509
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

510 511 512
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
513
        mapping = {
514 515 516
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
517 518
        }
        if self.type not in mapping.keys():
519
            raise self.WrongDiskTypeError(self)
520

521
        if self.is_in_use:
522 523
            raise self.DiskInUseError(self)

524
        if not self.is_ready:
Guba Sándor committed
525 526
            raise self.DiskIsNotReady(self)

527 528 529
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

530
        new_type, new_base = mapping[self.type]
531

532 533
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
534
                           name=self.name, size=self.size,
535
                           type=new_type, dev_num=self.dev_num)
536

Guba Sándor committed
537
        queue_name = self.get_remote_queue_name("storage", priority="slow")
538 539
        remote = storage_tasks.merge.apply_async(kwargs={
            "old_json": self.get_disk_desc(),
540 541
            "new_json": disk.get_disk_desc(),
            "parent_id": task.request.id},
542 543 544 545 546 547
            queue=queue_name
        )  # Timeout
        while True:
            try:
                remote.get(timeout=5)
                break
548
            except TimeoutError as e:
549 550 551
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
                    disk.destroy()
552 553
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
554 555 556
            except:
                disk.destroy()
                raise
557 558
        disk.is_ready = True
        disk.save()
Guba Sándor committed
559
        return disk
560 561 562

    def get_absolute_url(self):
        return reverse('dashboard.views.disk-detail', kwargs={'pk': self.pk})
563 564 565

    @property
    def is_resizable(self):
566
        return self.type in ('qcow2-norm', 'raw-rw', 'qcow2-snap', )
567 568 569 570

    @property
    def is_exportable(self):
        return self.type in ('qcow2-norm', 'qcow2-snap', 'raw-rw', 'raw-ro')