models.py 22.7 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

20
from __future__ import unicode_literals
21

22
from contextlib import contextmanager
23
import logging
24
from os.path import join
25 26
import uuid

27
from celery.signals import worker_ready
28
from django.db.models import (Model, CharField, DateTimeField,
29
                              ForeignKey)
30
from django.utils import timezone
31
from django.utils.translation import ugettext_lazy as _
32
from model_utils.models import TimeStampedModel
33
from sizefield.models import FileSizeField
34

35
from acl.models import AclBase
36
from .tasks import local_tasks, remote_tasks
37
from celery.exceptions import TimeoutError
38
from manager.mancelery import celery
39
from common.models import (ActivityModel, activitycontextimpl,
40
                           WorkerNotFound)
41 42 43 44

logger = logging.getLogger(__name__)


45
class DataStore(Model):
Guba Sándor committed
46

47 48
    """Collection of virtual disks.
    """
49 50 51 52
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
53

54 55 56 57 58 59 60 61
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

62
    def get_remote_queue_name(self, queue_id, check_worker=True):
63 64
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
65 66
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
67 68 69
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
70

71 72 73
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
74
                    destroyed__isnull=False) if disk.is_deletable]
75

76

77
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
78

79 80
    """A virtual disk.
    """
81 82 83 84 85
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
86 87
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
88
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
89 90
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
91 92 93
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
94
    size = FileSizeField(null=True, default=None)
95 96 97
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
98
                        verbose_name=_("device number"))
99
    destroyed = DateTimeField(blank=True, default=None, null=True)
100 101 102 103 104 105

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

106 107
    class WrongDiskTypeError(Exception):

108 109 110 111 112 113 114 115
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
116

117 118
    class DiskInUseError(Exception):

119 120 121 122
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
123
                           (disk.name, disk.filename))
124 125 126 127

            Exception.__init__(self, message)

            self.disk = disk
128

Guba Sándor committed
129
    class DiskIsNotReady(Exception):
Guba Sándor committed
130 131
        """ Exception for operations that need a deployed disk.
        """
Guba Sándor committed
132 133 134

        def __init__(self, disk, message=None):
            if message is None:
Guba Sándor committed
135
                message = ("The requested operation can't be performed on "
Guba Sándor committed
136 137 138 139 140 141 142
                           "disk '%s (%s)' because it has never been"
                           "deployed." % (disk.name, disk.filename))

            Exception.__init__(self, message)

            self.disk = disk

143
    @property
144
    def is_ready(self):
145 146 147 148
        """ Returns True if the disk is physically ready on the storage.

        It needs at least 1 successfull deploy action.
        """
149
        return self.activity_log.filter(activity_code__endswith="deploy",
150 151 152 153 154 155
                                        succeeded=True)

    @property
    def failed(self):
        """ Returns True if the last activity on the disk is failed.
        """
156 157
        result = self.activity_log.all().order_by('-id')[0].succeeded
        return not (result is None) and not result
158

159
    @property
160
    def path(self):
161 162
        """The path where the files are stored.
        """
163
        return join(self.datastore.path, self.filename)
164 165

    @property
166
    def vm_format(self):
167 168
        """Returns the proper file format for different type of images.
        """
169 170 171
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
172
            'iso': 'raw',
173 174 175 176
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

177
    @property
178
    def format(self):
179 180
        """Returns the proper file format for different types of images.
        """
181 182 183 184 185 186 187 188 189
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
190
    def device_type(self):
191 192
        """Returns the proper device prefix for different types of images.
        """
193
        return {
194 195
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
196
            'iso': 'hd',
197 198 199
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
200

201
    def is_downloading(self):
202
        return self.size is None and not self.failed
203 204 205 206

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
207 208 209 210 211 212 213 214
        try:
            task = self.activity_log.filter(
                activity_code__endswith="deploy",
                succeeded__isnull=True)[0].task_uuid
            result = celery.AsyncResult(id=task)
            return result.info.get("percent")
        except:
            return 0
215

216 217 218
    def get_latest_activity_result(self):
        return self.activity_log.latest("pk").result

219
    @property
220
    def is_deletable(self):
221
        """True if the associated file can be deleted.
222
        """
223
        # Check if all children and the disk itself is destroyed.
224
        return (self.destroyed is not None) and self.children_deletable
225

226 227 228
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
229
        """
230
        return all(i.is_deletable for i in self.derivatives.all())
231

232
    @property
233
    def is_in_use(self):
234
        """True if disk is attached to an active VM.
235 236 237 238

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
239
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
240

241 242 243 244 245 246 247 248 249 250 251
    def get_appliance(self):
        """Return an Instance or InstanceTemplate object where the disk is used
        """
        instance = self.instance_set.all()
        template = self.template_set.all()
        app = list(instance) + list(template)
        if len(app) > 0:
            return app[0]
        else:
            return None

252 253
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
254

255 256 257
        This method manipulates the database only.
        """
        type_mapping = {
258 259 260
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
261 262 263 264 265 266
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
267

268 269 270
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
271 272

    def get_vmdisk_desc(self):
273 274
        """Serialize disk object to the vmdriver.
        """
275
        return {
276
            'source': self.path,
277
            'driver_type': self.vm_format,
278
            'driver_cache': 'none',
279
            'target_device': self.device_type + self.dev_num,
280
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
281 282
        }

283
    def get_disk_desc(self):
284 285
        """Serialize disk object to the storage driver.
        """
286 287 288 289 290 291
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
292
            'type': 'snapshot' if self.base else 'normal'
293 294
        }

295
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
296 297
        """Returns the proper queue name based on the datastore.
        """
298
        if self.datastore:
299
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
300 301 302
        else:
            return None

303
    def __unicode__(self):
304
        return u"%s (#%d)" % (self.name, self.id or 0)
305

306
    def clean(self, *args, **kwargs):
Guba Sándor committed
307
        if (self.size is None or "") and self.base:
308 309 310
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

311
    def deploy(self, user=None, task_uuid=None, timeout=15):
312 313 314 315 316
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

317 318 319 320 321 322 323
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

324 325 326 327
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
328 329 330 331
        if self.destroyed:
            self.destroyed = None
            self.save()

332
        if self.is_ready:
333
            return True
334 335 336 337
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
338
            queue_name = self.get_remote_queue_name('storage')
339
            disk_desc = self.get_disk_desc()
340
            if self.base is not None:
341 342
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
343 344
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
345 346 347
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
348 349
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
350 351

            return True
352

353
    def deploy_async(self, user=None):
354 355
        """Execute deploy asynchronously.
        """
356 357
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
358

359
    @classmethod
360 361 362
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
363
        datastore = params.pop('datastore', DataStore.objects.get())
364 365
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
Guba Sándor committed
366
        disk.clean()
367
        disk.save()
Guba Sándor committed
368
        logger.debug("Disk created: %s", params)
369 370 371 372 373
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
374
        return disk
375

376
    @classmethod
377 378 379 380 381 382 383 384
    def create_empty_async(cls, instance=None, user=None, **kwargs):
        """Execute deploy asynchronously.
        """
        return local_tasks.create_empty.apply_async(
            args=[cls, instance, user, kwargs], queue="localhost.man")

    @classmethod
    def create_empty(cls, instance=None, user=None, task_uuid=None, **kwargs):
385 386
        """Create empty Disk object.

387 388
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
389
        :param user: Creator of the disk.
390
        :type user: django.contrib.auth.User
391 392

        :return: Disk object without a real image, to be .deploy()ed later.
393
        """
394
        disk = Disk.create(instance, user, **kwargs)
395
        disk.deploy(user=user, task_uuid=task_uuid)
396
        return disk
397 398

    @classmethod
399
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
400
        """Create disk object and download data from url asynchrnously.
401

402 403
        :param url: URL of image to download.
        :type url: string
404 405
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
406 407 408 409 410 411
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
412 413
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
414 415
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
416

417
    @classmethod
418 419
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
420 421 422 423
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
424 425
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
426 427
        :param user: owner of the disk
        :type user: django.contrib.auth.User
428 429
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
430

431 432
        :return: The created Disk object
        :rtype: Disk
433
        """
434
        kwargs.setdefault('name', url.split('/')[-1])
435
        disk = Disk.create(type="iso", instance=instance, user=user,
436
                           size=None, **kwargs)
437 438
        queue_name = disk.get_remote_queue_name('storage')

439 440 441 442 443 444 445 446 447
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
448

449
        with disk_activity(code_suffix='deploy', disk=disk,
450
                           task_uuid=task_uuid, user=user,
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
467
        return disk
468

469
    def destroy(self, user=None, task_uuid=None):
470 471 472
        if self.destroyed:
            return False

473 474 475 476
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
477

478
            return True
479

480
    def destroy_async(self, user=None):
481 482
        """Execute destroy asynchronously.
        """
483 484
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
485

486
    def restore(self, user=None, task_uuid=None):
487
        """Recover destroyed disk from trash if possible.
488 489 490 491 492 493 494 495
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
    def clone_async(self, new_disk=None, timeout=300, user=None):
        """Clone a Disk to another Disk

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: AsyncResult
        """
        return local_tasks.clone.apply_async(args=[self, new_disk,
                                                   timeout, user],
                                             queue="localhost.man")

    def clone(self, disk=None, user=None, task_uuid=None, timeout=300):
        """Cloning Disk into another Disk.

        The Disk.type can'T be snapshot.

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: the cloned Disk object.
        """
        banned_types = ['qcow2-snap']
        if self.type in banned_types:
            raise self.WrongDiskTypeError(self.type)
        if self.is_in_use:
            raise self.DiskInUseError(self)
527
        if not self.is_ready:
528 529
            raise self.DiskIsNotReady(self)
        if not disk:
530 531 532
            base = None
            if self.type == "iso":
                base = self
533 534
            disk = Disk.create(datastore=self.datastore,
                               name=self.name, size=self.size,
535
                               type=self.type, base=base)
536 537 538 539 540 541 542 543 544 545 546 547

        with disk_activity(code_suffix="clone", disk=self,
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk

548
    def save_as(self, user=None, task_uuid=None, timeout=300):
549 550
        """Save VM as template.

551 552 553 554
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

555 556 557
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
558
        mapping = {
559 560 561
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
562 563 564 565
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

566
        if self.is_in_use:
567 568
            raise self.DiskInUseError(self)

569
        if not self.is_ready:
Guba Sándor committed
570 571
            raise self.DiskIsNotReady(self)

572 573 574
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

575
        new_type, new_base = mapping[self.type]
576

577 578
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
579 580
                           name=self.name, size=self.size,
                           type=new_type)
581

582
        with disk_activity(code_suffix="save_as", disk=self,
583 584 585 586 587 588 589 590 591
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
592 593 594 595 596 597 598 599


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
600
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
601
        act = cls(activity_code='storage.Disk.' + code_suffix,
602
                  disk=disk, parent=None, started=timezone.now(),
603
                  task_uuid=task_uuid, user=user)
604
        act.save()
605
        return act
606

607 608 609 610 611 612 613 614 615
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

616 617 618
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
619
            disk=self.disk, parent=self, started=timezone.now(),
620 621 622
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
623

624 625 626
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
627
        return activitycontextimpl(act)
628

629

630
@contextmanager
631 632
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
633
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
634
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
635 636 637 638 639 640 641 642 643


@worker_ready.connect()
def cleanup(conf=None, **kwargs):
    # TODO check if other manager workers are running
    for i in DiskActivity.objects.filter(finished__isnull=True):
        i.finish(False, "Manager is restarted, activity is cleaned up. "
                 "You can try again now.")
        logger.error('Forced finishing stale activity %s', i)