models.py 22.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18 19
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
20

21
from contextlib import contextmanager
22
import logging
23
from os.path import join
24 25
import uuid

26
from django.db.models import (Model, CharField, DateTimeField,
27
                              ForeignKey)
28
from django.utils import timezone
29
from django.utils.translation import ugettext_lazy as _
30
from model_utils.models import TimeStampedModel
31
from sizefield.models import FileSizeField
32

33
from acl.models import AclBase
34
from .tasks import local_tasks, remote_tasks
35
from celery.exceptions import TimeoutError
36
from manager.mancelery import celery
37
from common.models import (ActivityModel, activitycontextimpl,
38
                           WorkerNotFound)
39 40 41 42

logger = logging.getLogger(__name__)


43
class DataStore(Model):
Guba Sándor committed
44

45 46
    """Collection of virtual disks.
    """
47 48 49 50
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
51

52 53 54 55 56 57 58 59
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

60
    def get_remote_queue_name(self, queue_id, check_worker=True):
61 62
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
63 64
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
65 66 67
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
68

69 70 71
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
72
                    destroyed__isnull=False) if disk.is_deletable]
73

74

75
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
76

77 78
    """A virtual disk.
    """
79 80 81 82 83
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
84 85
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
86
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
87 88
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
89 90 91
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
92
    size = FileSizeField(null=True, default=None)
93 94 95
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
96
                        verbose_name=_("device number"))
97
    destroyed = DateTimeField(blank=True, default=None, null=True)
98 99 100 101 102 103

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

104 105
    class WrongDiskTypeError(Exception):

106 107 108 109 110 111 112 113
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
114

115 116
    class DiskInUseError(Exception):

117 118 119 120
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
121
                           (disk.name, disk.filename))
122 123 124 125

            Exception.__init__(self, message)

            self.disk = disk
126

Guba Sándor committed
127
    class DiskIsNotReady(Exception):
Guba Sándor committed
128 129
        """ Exception for operations that need a deployed disk.
        """
Guba Sándor committed
130 131 132

        def __init__(self, disk, message=None):
            if message is None:
Guba Sándor committed
133
                message = ("The requested operation can't be performed on "
Guba Sándor committed
134 135 136 137 138 139 140
                           "disk '%s (%s)' because it has never been"
                           "deployed." % (disk.name, disk.filename))

            Exception.__init__(self, message)

            self.disk = disk

141
    @property
142
    def is_ready(self):
143 144 145 146
        """ Returns True if the disk is physically ready on the storage.

        It needs at least 1 successfull deploy action.
        """
147
        return self.activity_log.filter(activity_code__endswith="deploy",
148 149 150 151 152 153
                                        succeeded=True)

    @property
    def failed(self):
        """ Returns True if the last activity on the disk is failed.
        """
154 155
        result = self.activity_log.all().order_by('-id')[0].succeeded
        return not (result is None) and not result
156

157
    @property
158
    def path(self):
159 160
        """The path where the files are stored.
        """
161
        return join(self.datastore.path, self.filename)
162 163

    @property
164
    def vm_format(self):
165 166
        """Returns the proper file format for different type of images.
        """
167 168 169
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
170
            'iso': 'raw',
171 172 173 174
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

175
    @property
176
    def format(self):
177 178
        """Returns the proper file format for different types of images.
        """
179 180 181 182 183 184 185 186 187
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
188
    def device_type(self):
189 190
        """Returns the proper device prefix for different types of images.
        """
191
        return {
192 193
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
194
            'iso': 'hd',
195 196 197
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
198

199
    def is_downloading(self):
200
        return self.size is None and not self.failed
201 202 203 204

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
205 206 207 208 209 210 211 212
        try:
            task = self.activity_log.filter(
                activity_code__endswith="deploy",
                succeeded__isnull=True)[0].task_uuid
            result = celery.AsyncResult(id=task)
            return result.info.get("percent")
        except:
            return 0
213

214 215 216
    def get_latest_activity_result(self):
        return self.activity_log.latest("pk").result

217
    @property
218
    def is_deletable(self):
219
        """True if the associated file can be deleted.
220
        """
221
        # Check if all children and the disk itself is destroyed.
222
        return (self.destroyed is not None) and self.children_deletable
223

224 225 226
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
227
        """
228
        return all(i.is_deletable for i in self.derivatives.all())
229

230
    @property
231
    def is_in_use(self):
232
        """True if disk is attached to an active VM.
233 234 235 236

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
237
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
238

239 240 241 242 243 244 245 246 247 248 249
    def get_appliance(self):
        """Return an Instance or InstanceTemplate object where the disk is used
        """
        instance = self.instance_set.all()
        template = self.template_set.all()
        app = list(instance) + list(template)
        if len(app) > 0:
            return app[0]
        else:
            return None

250 251
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
252

253 254 255
        This method manipulates the database only.
        """
        type_mapping = {
256 257 258
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
259 260 261 262 263 264
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
265

266 267 268
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
269 270

    def get_vmdisk_desc(self):
271 272
        """Serialize disk object to the vmdriver.
        """
273
        return {
274
            'source': self.path,
275
            'driver_type': self.vm_format,
276
            'driver_cache': 'none',
277
            'target_device': self.device_type + self.dev_num,
278
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
279 280
        }

281
    def get_disk_desc(self):
282 283
        """Serialize disk object to the storage driver.
        """
284 285 286 287 288 289
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
290
            'type': 'snapshot' if self.base else 'normal'
291 292
        }

293
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
294 295
        """Returns the proper queue name based on the datastore.
        """
296
        if self.datastore:
297
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
298 299 300
        else:
            return None

301
    def __unicode__(self):
302
        return u"%s (#%d)" % (self.name, self.id or 0)
303

304
    def clean(self, *args, **kwargs):
Guba Sándor committed
305
        if (self.size is None or "") and self.base:
306 307 308
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

309
    def deploy(self, user=None, task_uuid=None, timeout=15):
310 311 312 313 314
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

315 316 317 318 319 320 321
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

322 323 324 325
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
326 327 328 329
        if self.destroyed:
            self.destroyed = None
            self.save()

330
        if self.is_ready:
331
            return True
332 333 334 335
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
336
            queue_name = self.get_remote_queue_name('storage')
337
            disk_desc = self.get_disk_desc()
338
            if self.base is not None:
339 340
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
341 342
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
343 344 345
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
346 347
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
348 349

            return True
350

351
    def deploy_async(self, user=None):
352 353
        """Execute deploy asynchronously.
        """
354 355
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
356

357
    @classmethod
358 359 360
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
361
        datastore = params.pop('datastore', DataStore.objects.get())
362 363
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
Guba Sándor committed
364
        disk.clean()
365
        disk.save()
Guba Sándor committed
366
        logger.debug("Disk created: %s", params)
367 368 369 370 371
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
372
        return disk
373

374
    @classmethod
375 376 377 378 379 380 381 382
    def create_empty_async(cls, instance=None, user=None, **kwargs):
        """Execute deploy asynchronously.
        """
        return local_tasks.create_empty.apply_async(
            args=[cls, instance, user, kwargs], queue="localhost.man")

    @classmethod
    def create_empty(cls, instance=None, user=None, task_uuid=None, **kwargs):
383 384
        """Create empty Disk object.

385 386
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
387
        :param user: Creator of the disk.
388
        :type user: django.contrib.auth.User
389 390

        :return: Disk object without a real image, to be .deploy()ed later.
391
        """
392
        disk = Disk.create(instance, user, **kwargs)
393
        disk.deploy(user=user, task_uuid=task_uuid)
394
        return disk
395 396

    @classmethod
397
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
398
        """Create disk object and download data from url asynchrnously.
399

400 401
        :param url: URL of image to download.
        :type url: string
402 403
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
404 405 406 407 408 409
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
410 411
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
412 413
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
414

415
    @classmethod
416 417
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
418 419 420 421
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
422 423
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
424 425
        :param user: owner of the disk
        :type user: django.contrib.auth.User
426 427
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
428

429 430
        :return: The created Disk object
        :rtype: Disk
431
        """
432
        kwargs.setdefault('name', url.split('/')[-1])
433
        disk = Disk.create(type="iso", instance=instance, user=user,
434
                           size=None, **kwargs)
435 436
        queue_name = disk.get_remote_queue_name('storage')

437 438 439 440 441 442 443 444 445
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
446

447
        with disk_activity(code_suffix='deploy', disk=disk,
448
                           task_uuid=task_uuid, user=user,
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
465
        return disk
466

467
    def destroy(self, user=None, task_uuid=None):
468 469 470
        if self.destroyed:
            return False

471 472 473 474
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
475

476
            return True
477

478
    def destroy_async(self, user=None):
479 480
        """Execute destroy asynchronously.
        """
481 482
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
483

484
    def restore(self, user=None, task_uuid=None):
485
        """Recover destroyed disk from trash if possible.
486 487 488 489 490 491 492 493
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
    def clone_async(self, new_disk=None, timeout=300, user=None):
        """Clone a Disk to another Disk

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: AsyncResult
        """
        return local_tasks.clone.apply_async(args=[self, new_disk,
                                                   timeout, user],
                                             queue="localhost.man")

    def clone(self, disk=None, user=None, task_uuid=None, timeout=300):
        """Cloning Disk into another Disk.

        The Disk.type can'T be snapshot.

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: the cloned Disk object.
        """
        banned_types = ['qcow2-snap']
        if self.type in banned_types:
            raise self.WrongDiskTypeError(self.type)
        if self.is_in_use:
            raise self.DiskInUseError(self)
525
        if not self.is_ready:
526 527
            raise self.DiskIsNotReady(self)
        if not disk:
528 529 530
            base = None
            if self.type == "iso":
                base = self
531 532
            disk = Disk.create(datastore=self.datastore,
                               name=self.name, size=self.size,
533
                               type=self.type, base=base)
534 535 536 537 538 539 540 541 542 543 544 545

        with disk_activity(code_suffix="clone", disk=self,
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk

546
    def save_as(self, user=None, task_uuid=None, timeout=300):
547 548
        """Save VM as template.

549 550 551 552
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

553 554 555
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
556
        mapping = {
557 558 559
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
560 561 562 563
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

564
        if self.is_in_use:
565 566
            raise self.DiskInUseError(self)

567
        if not self.is_ready:
Guba Sándor committed
568 569
            raise self.DiskIsNotReady(self)

570 571 572
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

573
        new_type, new_base = mapping[self.type]
574

575 576
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
577 578
                           name=self.name, size=self.size,
                           type=new_type)
579

580
        with disk_activity(code_suffix="save_as", disk=self,
581 582 583 584 585 586 587 588 589
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
590 591 592 593 594 595 596 597


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
598
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
599
        act = cls(activity_code='storage.Disk.' + code_suffix,
600
                  disk=disk, parent=None, started=timezone.now(),
601
                  task_uuid=task_uuid, user=user)
602
        act.save()
603
        return act
604

605 606 607 608 609 610 611 612 613
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

614 615 616
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
617
            disk=self.disk, parent=self, started=timezone.now(),
618 619 620
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
621

622 623 624
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
625
        return activitycontextimpl(act)
626

627

628
@contextmanager
629 630
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
631
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
632
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)