models.py 21 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

20
from __future__ import unicode_literals
21 22

import logging
23 24
import uuid

25
import re
Guba Sándor committed
26
from celery.contrib.abortable import AbortableAsyncResult
27
from celery.exceptions import TimeoutError
28
from django.core.exceptions import ObjectDoesNotExist
29
from django.core.urlresolvers import reverse
30 31
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
                              ForeignKey)
32
from django.utils import timezone
33
from django.utils.translation import ugettext_lazy as _, ugettext_noop
34
from model_utils.models import TimeStampedModel
35
from os.path import join
36
from sizefield.models import FileSizeField
37

38
from common.models import (
39
    WorkerNotFound, HumanReadableException, humanize_exception, method_cache
40
)
41
from .tasks import local_tasks, storage_tasks
42 43 44 45

logger = logging.getLogger(__name__)


46
class DataStore(Model):
47 48
    """Collection of virtual disks.
    """
49 50 51 52
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
53

54 55 56 57 58 59 60 61
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

62 63
    def get_remote_queue_name(self, queue_id, priority=None,
                              check_worker=True):
64 65
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
66
        if not check_worker or local_tasks.check_queue(self.hostname,
Guba Sándor committed
67 68 69 70 71 72
                                                       queue_id,
                                                       priority):
            queue_name = self.hostname + '.' + queue_id
            if priority is not None:
                queue_name = queue_name + '.' + priority
            return queue_name
73 74
        else:
            raise WorkerNotFound()
75

76 77 78
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
79
                    destroyed__isnull=False) if disk.is_deletable]
80

81 82 83 84 85 86 87
    @method_cache(30)
    def get_statistics(self, timeout=15):
        q = self.get_remote_queue_name("storage", priority="fast")
        return storage_tasks.get_storage_stat.apply_async(
            args=[self.path], queue=q).get(timeout=timeout)

    @method_cache(30)
88
    def get_orphan_disks(self, timeout=25):
89 90 91 92 93 94 95 96 97 98 99 100 101 102
        """Disk image files without Disk object in the database.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = set([disk.filename for disk in self.disk_set.all()])

        orphans = []
        for i in files - disks:
            if not re.match('cloud-[0-9]*\.dump', i):
                orphans.append(i)
        return orphans

    @method_cache(30)
103
    def get_missing_disks(self, timeout=25):
104 105 106 107 108 109 110 111
        """Disk objects without disk image files.
        """
        queue_name = self.get_remote_queue_name('storage', "slow")
        files = set(storage_tasks.list_files.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout))
        disks = Disk.objects.filter(destroyed__isnull=True, is_ready=True)
        return disks.exclude(filename__in=files)

112
    @method_cache(120)
113
    def get_file_statistics(self, timeout=90):
114 115 116 117 118
        queue_name = self.get_remote_queue_name('storage', "slow")
        data = storage_tasks.get_file_statistics.apply_async(
            args=[self.path], queue=queue_name).get(timeout=timeout)
        return data

119

Bach Dániel committed
120
class Disk(TimeStampedModel):
121 122 123 124
    """A virtual disk.
    """
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
125
    BUS_TYPES = (('virtio', 'virtio'), ('ide', 'ide'), ('scsi', 'scsi'))
126 127 128 129
    EXPORT_FORMATS = (('vmdk', _('VMware disk image')),
                      ('qcow2', _('QEMU disk image')),
                      ('vdi', _('VirtualBox disk image')),
                      ('vpc', _('HyperV disk image')))
130
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
131 132
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
133 134 135
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
136 137
    bus = CharField(max_length=10, choices=BUS_TYPES, null=True, blank=True,
                    default=None)
138
    size = FileSizeField(null=True, default=None)
139 140 141
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
142
                        verbose_name=_("device number"))
143
    destroyed = DateTimeField(blank=True, default=None, null=True)
144

Guba Sándor committed
145 146
    is_ready = BooleanField(default=False)

147 148 149 150
    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')
151 152
        permissions = (
            ('create_empty_disk', _('Can create an empty disk.')),
153
            ('download_disk', _('Can download a disk.')),
154 155 156
            ('resize_disk', _('Can resize a disk.')),
            ('import_disk', _('Can import a disk.')),
            ('export_disk', _('Can export a disk.'))
157
        )
158

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
    class DiskError(HumanReadableException):
        admin_message = None

        def __init__(self, disk, params=None, level=None, **kwargs):
            kwargs.update(params or {})
            self.disc = kwargs["disk"] = disk
            super(Disk.DiskError, self).__init__(
                level, self.message, self.admin_message or self.message,
                kwargs)

    class WrongDiskTypeError(DiskError):
        message = ugettext_noop("Operation can't be invoked on disk "
                                "'%(name)s' of type '%(type)s'.")

        admin_message = ugettext_noop(
            "Operation can't be invoked on disk "
            "'%(name)s' (%(pk)s) of type '%(type)s'.")

        def __init__(self, disk, params=None, **kwargs):
            super(Disk.WrongDiskTypeError, self).__init__(
                disk, params, type=disk.type, name=disk.name, pk=disk.pk)

    class DiskInUseError(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it is in use.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) because it is in use.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
191
            super(Disk.DiskInUseError, self).__init__(
192 193 194 195 196 197 198 199 200 201 202 203 204
                disk, params, name=disk.name, pk=disk.pk)

    class DiskIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because it has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because it has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
Guba Sándor committed
205
            super(Disk.DiskIsNotReady, self).__init__(
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename)

    class DiskBaseIsNotReady(DiskError):
        message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' because its base has never been deployed.")

        admin_message = ugettext_noop(
            "The requested operation can't be performed on "
            "disk '%(name)s' (%(pk)s) [%(filename)s] because its base "
            "'%(b_name)s' (%(b_pk)s) [%(b_filename)s] has never been"
            "deployed.")

        def __init__(self, disk, params=None, **kwargs):
221
            base = kwargs.get('base')
Guba Sándor committed
222
            super(Disk.DiskBaseIsNotReady, self).__init__(
223 224 225
                disk, params, name=disk.name, pk=disk.pk,
                filename=disk.filename, b_name=base.name,
                b_pk=base.pk, b_filename=base.filename)
Guba Sándor committed
226

227 228
    @property
    def path(self):
229 230
        """The path where the files are stored.
        """
231
        return join(self.datastore.path, self.filename)
232 233

    @property
234
    def vm_format(self):
235 236
        """Returns the proper file format for different type of images.
        """
237 238 239
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
240
            'iso': 'raw',
241 242 243 244
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

245
    @property
246
    def format(self):
247 248
        """Returns the proper file format for different types of images.
        """
249 250 251 252 253 254 255 256 257
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
258
    def device_type(self):
259 260
        """Returns the proper device prefix for different types of images.
        """
261
        return {
262 263
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
264
            'iso': 'sd',
265 266 267
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
268

269
    @property
270 271 272
    def device_bus(self):
        """Returns the proper device prefix for different types of images.
        """
273 274
        if self.bus:
            return self.bus
275 276 277
        return {
            'qcow2-norm': 'virtio',
            'qcow2-snap': 'virtio',
278
            'iso': 'ide',
279 280 281 282 283
            'raw-ro': 'virtio',
            'raw-rw': 'virtio',
        }[self.type]

    @property
284
    def is_deletable(self):
285
        """True if the associated file can be deleted.
286
        """
287
        # Check if all children and the disk itself is destroyed.
288
        return (self.destroyed is not None) and self.children_deletable
289

290 291 292
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
293
        """
294
        return all(i.is_deletable for i in self.derivatives.all())
295

296
    @property
297
    def is_in_use(self):
298
        """True if disk is attached to an active VM.
299 300 301 302

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
303
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
304

305
    def get_appliance(self):
Bach Dániel committed
306 307
        """Return the Instance or InstanceTemplate object where the disk
        is used
308
        """
Bach Dániel committed
309
        try:
310 311 312 313
            app = self.template_set.all() or self.instance_set.all()
            return app.get()
        except ObjectDoesNotExist:
            return None
314

315 316
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
317

318 319 320
        This method manipulates the database only.
        """
        type_mapping = {
321 322 323
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
324 325 326
        }

        if self.type not in type_mapping.keys():
327
            raise self.WrongDiskTypeError(self)
328 329

        new_type = type_mapping[self.type]
330

331 332
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
333
                           type=new_type, dev_num=self.dev_num)
334 335

    def get_vmdisk_desc(self):
336 337
        """Serialize disk object to the vmdriver.
        """
338
        return {
339
            'source': self.path,
340
            'driver_type': self.vm_format,
341
            'driver_cache': 'none',
342
            'target_device': self.device_type + self.dev_num,
343
            'target_bus': self.device_bus,
344
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
345 346
        }

347
    def get_disk_desc(self):
348 349
        """Serialize disk object to the storage driver.
        """
350 351 352 353 354 355
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
356
            'type': 'snapshot' if self.base else 'normal'
357 358
        }

359 360
    def get_remote_queue_name(self, queue_id='storage', priority=None,
                              check_worker=True):
361 362
        """Returns the proper queue name based on the datastore.
        """
363
        if self.datastore:
364 365
            return self.datastore.get_remote_queue_name(queue_id, priority,
                                                        check_worker)
366 367 368
        else:
            return None

369
    def __unicode__(self):
370
        return u"%s (#%d)" % (self.name, self.id or 0)
371

372
    def clean(self, *args, **kwargs):
Guba Sándor committed
373
        if (self.size is None or "") and self.base:
374 375 376
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

377
    def deploy(self, user=None, task_uuid=None, timeout=15):
378 379 380 381 382
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

383 384 385 386 387 388 389
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

390 391 392 393
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
394 395 396 397
        if self.destroyed:
            self.destroyed = None
            self.save()

398
        if self.is_ready:
399
            return True
400
        if self.base and not self.base.is_ready:
401
            raise self.DiskBaseIsNotReady(self, base=self.base)
Guba Sándor committed
402 403 404 405 406 407 408 409 410 411
        queue_name = self.get_remote_queue_name('storage', priority="fast")
        disk_desc = self.get_disk_desc()
        if self.base is not None:
            storage_tasks.snapshot.apply_async(args=[disk_desc],
                                               queue=queue_name
                                               ).get(timeout=timeout)
        else:
            storage_tasks.create.apply_async(args=[disk_desc],
                                             queue=queue_name
                                             ).get(timeout=timeout)
412

413 414
        self.is_ready = True
        self.save()
Guba Sándor committed
415
        return True
416

417
    @classmethod
Guba Sándor committed
418 419
    def create(cls, user=None, **params):
        disk = cls.__create(user, params)
Guba Sándor committed
420
        disk.clean()
421
        disk.save()
422 423
        logger.debug(u"Disk created from: %s",
                     unicode(params.get("base", "nobase")))
424
        return disk
425

426
    @classmethod
Guba Sándor committed
427 428 429 430
    def __create(cls, user, params):
        datastore = params.pop('datastore', DataStore.objects.get())
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
431
        return disk
432 433

    @classmethod
Guba Sándor committed
434
    def download(cls, url, task, user=None, **params):
435 436 437 438
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
439 440
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
441 442
        :param user: owner of the disk
        :type user: django.contrib.auth.User
443 444
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
445

446 447
        :return: The created Disk object
        :rtype: Disk
448
        """
Guba Sándor committed
449
        params.setdefault('name', url.split('/')[-1])
450 451 452
        params.setdefault('type', 'iso')
        params.setdefault('size', None)
        disk = cls.__create(params=params, user=user)
Guba Sándor committed
453 454
        queue_name = disk.get_remote_queue_name('storage', priority='slow')
        remote = storage_tasks.download.apply_async(
455
            kwargs={'url': url, 'parent_id': task.request.id,
Guba Sándor committed
456 457 458 459
                    'disk': disk.get_disk_desc()},
            queue=queue_name)
        while True:
            try:
460
                result = remote.get(timeout=5)
Guba Sándor committed
461
                break
462
            except TimeoutError as e:
Guba Sándor committed
463 464
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
465 466
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
467 468
        disk.size = result['size']
        disk.type = result['type']
469
        disk.checksum = result.get('checksum', None)
470
        disk.is_ready = True
Guba Sándor committed
471
        disk.save()
472
        return disk
473

474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
    @classmethod
    def import_disk(cls, user, name, download_link, timeout=3600):
        params = {'name': name,
                  'type': 'qcow2-norm'}
        disk = cls.create(user, **params)
        queue_name = disk.get_remote_queue_name('storage', priority='slow')
        remote = storage_tasks.import_disk.apply_async(
            args=[disk.get_disk_desc(), download_link],
            queue=queue_name
        )
        disk_size = remote.get(timeout=timeout)
        disk.size = disk_size
        disk.is_ready = True
        disk.save()
        return disk

    def export(self, format, upload_link, timeout=3600):
491 492
        exported_name = self.name if self.name != '' else self.filename

493 494
        queue_name = self.get_remote_queue_name('storage', priority='slow')
        storage_tasks.export.apply_async(
495
            args=[self.get_disk_desc(), format, exported_name, upload_link],
496
            queue=queue_name).get(timeout=timeout)
497

498
    def destroy(self, user=None, task_uuid=None):
499 500 501
        if self.destroyed:
            return False

Guba Sándor committed
502 503 504
        self.destroyed = timezone.now()
        self.save()
        return True
505

506
    def restore(self, user=None, task_uuid=None, timeout=15):
507
        """Recover destroyed disk from trash if possible.
508
        """
509 510 511 512 513 514 515
        queue_name = self.datastore.get_remote_queue_name(
            'storage', priority='slow')
        logger.info("Image: %s at Datastore: %s recovered from trash." %
                    (self.filename, self.datastore.path))
        storage_tasks.recover_from_trash.apply_async(
            args=[self.datastore.path, self.filename],
            queue=queue_name).get(timeout=timeout)
516

517
    def save_as(self, task=None, user=None, task_uuid=None, timeout=300):
518 519
        """Save VM as template.

520 521 522 523
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

524 525 526
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
527
        mapping = {
528 529 530
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
531 532
        }
        if self.type not in mapping.keys():
533
            raise self.WrongDiskTypeError(self)
534

535
        if self.is_in_use:
536 537
            raise self.DiskInUseError(self)

538
        if not self.is_ready:
Guba Sándor committed
539 540
            raise self.DiskIsNotReady(self)

541 542 543
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

544
        new_type, new_base = mapping[self.type]
545

546 547
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
548
                           name=self.name, size=self.size,
549
                           type=new_type, dev_num=self.dev_num)
550

Guba Sándor committed
551
        queue_name = self.get_remote_queue_name("storage", priority="slow")
552 553
        remote = storage_tasks.merge.apply_async(kwargs={
            "old_json": self.get_disk_desc(),
554 555
            "new_json": disk.get_disk_desc(),
            "parent_id": task.request.id},
556 557 558 559 560 561
            queue=queue_name
        )  # Timeout
        while True:
            try:
                remote.get(timeout=5)
                break
562
            except TimeoutError as e:
563 564 565
                if task is not None and task.is_aborted():
                    AbortableAsyncResult(remote.id).abort()
                    disk.destroy()
566 567
                    raise humanize_exception(ugettext_noop(
                        "Operation aborted by user."), e)
568 569 570
            except:
                disk.destroy()
                raise
571 572
        disk.is_ready = True
        disk.save()
Guba Sándor committed
573
        return disk
574 575 576

    def get_absolute_url(self):
        return reverse('dashboard.views.disk-detail', kwargs={'pk': self.pk})
577 578 579

    @property
    def is_resizable(self):
580
        return self.type in ('qcow2-norm', 'raw-rw', 'qcow2-snap',)
581 582 583 584

    @property
    def is_exportable(self):
        return self.type in ('qcow2-norm', 'qcow2-snap', 'raw-rw', 'raw-ro')