models.py 19.8 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

20
from __future__ import unicode_literals
21 22

import logging
23
from os.path import join
24
import uuid
25
import re
26

Guba Sándor committed
27 28
from celery.contrib.abortable import AbortableAsyncResult
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
29
                              ForeignKey)
30
from django.core.exceptions import ObjectDoesNotExist
31
from django.core.urlresolvers import reverse
32
from django.utils import timezone
33
from django.utils.translation import ugettext_lazy as _, ugettext_noop
34
from model_utils.models import TimeStampedModel
35
from sizefield.models import FileSizeField
36

Guba Sándor committed
37
from .tasks import local_tasks, storage_tasks
38
from celery.exceptions import TimeoutError
39
from common.models import (
40
    WorkerNotFound, HumanReadableException, humanize_exception, method_cache
41
)
42 43 44 45

logger = logging.getLogger(__name__)


46
class DataStore(Model):
Guba Sándor committed
47

48 49
    """Collection of virtual disks.
    """
50 51 52 53
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
54

55 56 57 58 59 60 61 62
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

63 64
    def get_remote_queue_name(self, queue_id, priority=None,
                              check_worker=True):
65 66
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
67
        if not check_worker or local_tasks.check_queue(self.hostname,
Guba Sándor committed
68 69 70 71 72 73
                                                       queue_id,
                                                       priority):
            queue_name = self.hostname + '.' + queue_id
            if priority is not None:
                queue_name = queue_name + '.' + priority
            return queue_name
74 75
        else:
            raise WorkerNotFound()
76

77 78 79
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
80
                    destroyed__isnull=False) if disk.is_deletable]
81

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    @method_cache(30)
    def get_statistics(self, timeout=15):
        q = self.get_remote_queue_name("storage", priority="fast")
        return storage_tasks.get_storage_stat.apply_async(
            args=[self.path], queue=q).get(timeout=timeout)

    @method_cache(30)
    def get_orphan_disks(self, timeout=15):
        """Disk image files without Disk object in the database.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = set([disk.filename for disk in self.disk_set.all()])

        orphans = []
        for i in files - disks:
            if not re.match('cloud-[0-9]*\.dump', i):
                orphans.append(i)
        return orphans

    @method_cache(30)
    def get_missing_disks(self, timeout=15):
        """Disk objects without disk image files.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = Disk.objects.filter(destroyed__isnull=True, is_ready=True)
        return disks.exclude(filename__in=files)

113 114
    @method_cache(120)
    def get_file_statistics(self, timeout=30):
115 116 117 118 119
        queue_name = self.get_remote_queue_name('storage', "slow")
        data = storage_tasks.get_file_statistics.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout)
        return data

120

Bach Dániel committed
121
class Disk(TimeStampedModel):
Guba Sándor committed
122

123 124 125 126
    """A virtual disk.
    """
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
127
    BUS_TYPES = (('virtio', 'virtio'), ('ide', 'ide'), ('scsi', 'scsi'))
128
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
129 130
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
131 132 133
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
134 135
    bus = CharField(max_length=10, choices=BUS_TYPES, null=True, blank=True,
                    default=None)
136
    size = FileSizeField(null=True, default=None)
137 138 139
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
140
                        verbose_name=_("device number"))
141
    destroyed = DateTimeField(blank=True, default=None, null=True)
142

Guba Sándor committed
143 144
    is_ready = BooleanField(default=False)

145 146 147 148
    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')
149 150
        permissions = (
            ('create_empty_disk', _('Can create an empty disk.')),
151
            ('download_disk', _('Can download a disk.')),
152 153 154
            ('resize_disk', _('Can resize a disk.')),
            ('import_disk', _('Can import a disk.')),
            ('export_disk', _('Can export a disk.'))
155
        )
156

157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
    class DiskError(HumanReadableException):
        admin_message = None

        def __init__(self, disk, params=None, level=None, **kwargs):
            kwargs.update(params or {})
            self.disc = kwargs["disk"] = disk
            super(Disk.DiskError, self).__init__(
                level, self.message, self.admin_message or self.message,
                kwargs)

    class WrongDiskTypeError(DiskError):
        message = ugettext_noop("Operation can't be invoked on disk "
                                "'%(name)s' of type '%(type)s'.")

        admin_message = ugettext_noop(
            "Operation can't be invoked on disk "
            "'%(name)s' (%(pk)s) of type '%(type)s'.")

        def __init__(self, disk, params=None, **kwargs):
            super(Disk.WrongDiskTypeError, self).__init__(
                disk, params, type=disk.type, name=disk.name, pk=disk.pk)

    class DiskInUseError(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it is in use.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) because it is in use.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
189
            super(Disk.DiskInUseError, self).__init__(
190 191 192 193 194 195 196 197 198 199 200 201 202
                disk, params, name=disk.name, pk=disk.pk)

    class DiskIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because it has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
203
            super(Disk.DiskIsNotReady, self).__init__(
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename)

    class DiskBaseIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because its base has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because its base "
            "'%(b_name)s' (%(b_pk)s) [%(b_filename)s] has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
219
            base = kwargs.get('base')
Guba Sándor committed
220
            super(Disk.DiskBaseIsNotReady, self).__init__(
221 222 223
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename, b_name=base.name,
                b_pk=base.pk, b_filename=base.filename)
Guba Sándor committed
224

225 226
    @property
    def path(self):
227 228
        """The path where the files are stored.
        """
229
        return join(self.datastore.path, self.filename)
230 231

    @property
232
    def vm_format(self):
233 234
        """Returns the proper file format for different type of images.
        """
235 236 237
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
238
            'iso': 'raw',
239 240 241 242
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

243
    @property
244
    def format(self):
245 246
        """Returns the proper file format for different types of images.
        """
247 248 249 250 251 252 253 254 255
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
256
    def device_type(self):
257 258
        """Returns the proper device prefix for different types of images.
        """
259
        return {
260 261
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
262
            'iso': 'sd',
263 264 265
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
266

267
    @property
268 269 270
    def device_bus(self):
        """Returns the proper device prefix for different types of images.
        """
271 272
        if self.bus:
            return self.bus
273 274 275
        return {
            'qcow2-norm': 'virtio',
            'qcow2-snap': 'virtio',
276
            'iso': 'ide',
277 278 279 280 281
            'raw-ro': 'virtio',
            'raw-rw': 'virtio',
        }[self.type]

    @property
282
    def is_deletable(self):
283
        """True if the associated file can be deleted.
284
        """
285
        # Check if all children and the disk itself is destroyed.
286
        return (self.destroyed is not None) and self.children_deletable
287

288 289 290
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
291
        """
292
        return all(i.is_deletable for i in self.derivatives.all())
293

294
    @property
295
    def is_in_use(self):
296
        """True if disk is attached to an active VM.
297 298 299 300

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
301
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
302

303
    def get_appliance(self):
Bach Dániel committed
304 305
        """Return the Instance or InstanceTemplate object where the disk
        is used
306
        """
Bach Dániel committed
307
        try:
308 309 310 311
            app = self.template_set.all() or self.instance_set.all()
            return app.get()
        except ObjectDoesNotExist:
            return None
312

313 314
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
315

316 317 318
        This method manipulates the database only.
        """
        type_mapping = {
319 320 321
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
322 323 324
        }

        if self.type not in type_mapping.keys():
325
            raise self.WrongDiskTypeError(self)
326 327

        new_type = type_mapping[self.type]
328

329 330
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
331
                           type=new_type, dev_num=self.dev_num)
332 333

    def get_vmdisk_desc(self):
334 335
        """Serialize disk object to the vmdriver.
        """
336
        return {
337
            'source': self.path,
338
            'driver_type': self.vm_format,
339
            'driver_cache': 'none',
340
            'target_device': self.device_type + self.dev_num,
341
            'target_bus': self.device_bus,
342
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
343 344
        }

345
    def get_disk_desc(self):
346 347
        """Serialize disk object to the storage driver.
        """
348 349 350 351 352 353
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
354
            'type': 'snapshot' if self.base else 'normal'
355 356
        }

357 358
    def get_remote_queue_name(self, queue_id='storage', priority=None,
                              check_worker=True):
359 360
        """Returns the proper queue name based on the datastore.
        """
361
        if self.datastore:
362 363
            return self.datastore.get_remote_queue_name(queue_id, priority,
                                                        check_worker)
364 365 366
        else:
            return None

367
    def __unicode__(self):
368
        return u"%s (#%d)" % (self.name, self.id or 0)
369

370
    def clean(self, *args, **kwargs):
Guba Sándor committed
371
        if (self.size is None or "") and self.base:
372 373 374
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

375
    def deploy(self, user=None, task_uuid=None, timeout=15):
376 377 378 379 380
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

381 382 383 384 385 386 387
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

388 389 390 391
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
392 393 394 395
        if self.destroyed:
            self.destroyed = None
            self.save()

396
        if self.is_ready:
397
            return True
398
        if self.base and not self.base.is_ready:
399
            raise self.DiskBaseIsNotReady(self, base=self.base)
Guba Sándor committed
400 401 402 403 404 405 406 407 408 409
        queue_name = self.get_remote_queue_name('storage', priority="fast")
        disk_desc = self.get_disk_desc()
        if self.base is not None:
            storage_tasks.snapshot.apply_async(args=[disk_desc],
                                               queue=queue_name
                                               ).get(timeout=timeout)
        else:
            storage_tasks.create.apply_async(args=[disk_desc],
                                             queue=queue_name
                                             ).get(timeout=timeout)
410

411 412
        self.is_ready = True
        self.save()
Guba Sándor committed
413
        return True
414

415
    @classmethod
Guba Sándor committed
416 417
    def create(cls, user=None, **params):
        disk = cls.__create(user, params)
Guba Sándor committed
418
        disk.clean()
419
        disk.save()
420 421
        logger.debug(u"Disk created from: %s",
                     unicode(params.get("base", "nobase")))
422
        return disk
423

424
    @classmethod
Guba Sándor committed
425 426 427 428
    def __create(cls, user, params):
        datastore = params.pop('datastore', DataStore.objects.get())
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
429
        return disk
430 431

    @classmethod
Guba Sándor committed
432
    def download(cls, url, task, user=None, **params):
433 434 435 436
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
437 438
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
439 440
        :param user: owner of the disk
        :type user: django.contrib.auth.User
441 442
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
443

444 445
        :return: The created Disk object
        :rtype: Disk
446
        """
Guba Sándor committed
447
        params.setdefault('name', url.split('/')[-1])
448 449 450
        params.setdefault('type', 'iso')
        params.setdefault('size', None)
        disk = cls.__create(params=params, user=user)
Guba Sándor committed
451 452
        queue_name = disk.get_remote_queue_name('storage', priority='slow')
        remote = storage_tasks.download.apply_async(
453
            kwargs={'url': url, 'parent_id': task.request.id,
Guba Sándor committed
454 455 456 457
                    'disk': disk.get_disk_desc()},
            queue=queue_name)
        while True:
            try:
458
                result = remote.get(timeout=5)
Guba Sándor committed
459
                break
460
            except TimeoutError as e:
Guba Sándor committed
461 462
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
463 464
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
465 466
        disk.size = result['size']
        disk.type = result['type']
467
        disk.checksum = result.get('checksum', None)
468
        disk.is_ready = True
Guba Sándor committed
469
        disk.save()
470
        return disk
471

472
    def destroy(self, user=None, task_uuid=None):
473 474 475
        if self.destroyed:
            return False

Guba Sándor committed
476 477 478
        self.destroyed = timezone.now()
        self.save()
        return True
479

480
    def restore(self, user=None, task_uuid=None, timeout=15):
481
        """Recover destroyed disk from trash if possible.
482
        """
483 484 485 486 487 488 489
        queue_name = self.datastore.get_remote_queue_name(
            'storage', priority='slow')
        logger.info("Image: %s at Datastore: %s recovered from trash." %
                    (self.filename, self.datastore.path))
        storage_tasks.recover_from_trash.apply_async(
            args=[self.datastore.path, self.filename],
            queue=queue_name).get(timeout=timeout)
490

491
    def save_as(self, task=None, user=None, task_uuid=None, timeout=300):
492 493
        """Save VM as template.

494 495 496 497
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

498 499 500
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
501
        mapping = {
502 503 504
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
505 506
        }
        if self.type not in mapping.keys():
507
            raise self.WrongDiskTypeError(self)
508

509
        if self.is_in_use:
510 511
            raise self.DiskInUseError(self)

512
        if not self.is_ready:
Guba Sándor committed
513 514
            raise self.DiskIsNotReady(self)

515 516 517
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

518
        new_type, new_base = mapping[self.type]
519

520 521
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
522
                           name=self.name, size=self.size,
523
                           type=new_type, dev_num=self.dev_num)
524

Guba Sándor committed
525
        queue_name = self.get_remote_queue_name("storage", priority="slow")
526 527
        remote = storage_tasks.merge.apply_async(kwargs={
            "old_json": self.get_disk_desc(),
528 529
            "new_json": disk.get_disk_desc(),
            "parent_id": task.request.id},
530 531 532 533 534 535
            queue=queue_name
        )  # Timeout
        while True:
            try:
                remote.get(timeout=5)
                break
536
            except TimeoutError as e:
537 538 539
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
                    disk.destroy()
540 541
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
542 543 544
            except:
                disk.destroy()
                raise
545 546
        disk.is_ready = True
        disk.save()
Guba Sándor committed
547
        return disk
548 549 550

    def get_absolute_url(self):
        return reverse('dashboard.views.disk-detail', kwargs={'pk': self.pk})
551 552 553

    @property
    def is_resizable(self):
554
        return self.type in ('qcow2-norm', 'raw-rw', 'qcow2-snap', )
555 556 557 558

    @property
    def is_exportable(self):
        return self.type in ('qcow2-norm', 'qcow2-snap', 'raw-rw', 'raw-ro')