models.py 22.9 KB
Newer Older
1 2
# -*- coding: utf-8 -*-

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

20
from __future__ import unicode_literals
21

22
from contextlib import contextmanager
23
import logging
24
from os.path import join
25 26
import uuid

27
from celery.signals import worker_ready
28
from django.db.models import (Model, CharField, DateTimeField,
29
                              ForeignKey)
30
from django.utils import timezone
31
from django.utils.translation import ugettext_lazy as _
32
from model_utils.models import TimeStampedModel
33
from sizefield.models import FileSizeField
34

35
from acl.models import AclBase
36
from .tasks import local_tasks, remote_tasks
37
from celery.exceptions import TimeoutError
38
from manager.mancelery import celery
39
from common.models import (ActivityModel, activitycontextimpl,
40
                           WorkerNotFound)
41 42 43 44

logger = logging.getLogger(__name__)


45
class DataStore(Model):
Guba Sándor committed
46

47 48
    """Collection of virtual disks.
    """
49 50 51 52
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
53

54 55 56 57 58 59 60 61
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

62 63
    def get_remote_queue_name(self, queue_id, priority=None,
                              check_worker=True):
64 65
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
66
        if not check_worker or local_tasks.check_queue(self.hostname,
67
                                                       queue_id, priority):
68 69 70
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
71

72 73 74
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
75
                    destroyed__isnull=False) if disk.is_deletable]
76

77

78
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
79

80 81
    """A virtual disk.
    """
82 83 84 85 86
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
87 88
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
89
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
90 91
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
92 93 94
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
95
    size = FileSizeField(null=True, default=None)
96 97 98
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
99
                        verbose_name=_("device number"))
100
    destroyed = DateTimeField(blank=True, default=None, null=True)
101 102 103 104 105 106

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

107 108
    class WrongDiskTypeError(Exception):

109 110 111 112 113 114 115 116
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
117

118 119
    class DiskInUseError(Exception):

120 121 122 123
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
124
                           (disk.name, disk.filename))
125 126 127 128

            Exception.__init__(self, message)

            self.disk = disk
129

Guba Sándor committed
130
    class DiskIsNotReady(Exception):
Guba Sándor committed
131 132
        """ Exception for operations that need a deployed disk.
        """
Guba Sándor committed
133 134 135

        def __init__(self, disk, message=None):
            if message is None:
Guba Sándor committed
136
                message = ("The requested operation can't be performed on "
Guba Sándor committed
137 138 139 140 141 142 143
                           "disk '%s (%s)' because it has never been"
                           "deployed." % (disk.name, disk.filename))

            Exception.__init__(self, message)

            self.disk = disk

144
    @property
145
    def is_ready(self):
146 147 148 149
        """ Returns True if the disk is physically ready on the storage.

        It needs at least 1 successfull deploy action.
        """
150
        return self.activity_log.filter(activity_code__endswith="deploy",
151 152 153 154 155 156
                                        succeeded=True)

    @property
    def failed(self):
        """ Returns True if the last activity on the disk is failed.
        """
157 158
        result = self.activity_log.all().order_by('-id')[0].succeeded
        return not (result is None) and not result
159

160
    @property
161
    def path(self):
162 163
        """The path where the files are stored.
        """
164
        return join(self.datastore.path, self.filename)
165 166

    @property
167
    def vm_format(self):
168 169
        """Returns the proper file format for different type of images.
        """
170 171 172
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
173
            'iso': 'raw',
174 175 176 177
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

178
    @property
179
    def format(self):
180 181
        """Returns the proper file format for different types of images.
        """
182 183 184 185 186 187 188 189 190
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
191
    def device_type(self):
192 193
        """Returns the proper device prefix for different types of images.
        """
194
        return {
195 196
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
197
            'iso': 'hd',
198 199 200
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
201

202
    def is_downloading(self):
203
        return self.size is None and not self.failed
204 205 206 207

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
208 209 210 211 212 213 214 215
        try:
            task = self.activity_log.filter(
                activity_code__endswith="deploy",
                succeeded__isnull=True)[0].task_uuid
            result = celery.AsyncResult(id=task)
            return result.info.get("percent")
        except:
            return 0
216

217 218 219
    def get_latest_activity_result(self):
        return self.activity_log.latest("pk").result

220
    @property
221
    def is_deletable(self):
222
        """True if the associated file can be deleted.
223
        """
224
        # Check if all children and the disk itself is destroyed.
225
        return (self.destroyed is not None) and self.children_deletable
226

227 228 229
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
230
        """
231
        return all(i.is_deletable for i in self.derivatives.all())
232

233
    @property
234
    def is_in_use(self):
235
        """True if disk is attached to an active VM.
236 237 238 239

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
240
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
241

242 243 244 245 246 247 248 249 250 251 252
    def get_appliance(self):
        """Return an Instance or InstanceTemplate object where the disk is used
        """
        instance = self.instance_set.all()
        template = self.template_set.all()
        app = list(instance) + list(template)
        if len(app) > 0:
            return app[0]
        else:
            return None

253 254
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
255

256 257 258
        This method manipulates the database only.
        """
        type_mapping = {
259 260 261
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
262 263 264 265 266 267
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
268

269 270 271
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
272 273

    def get_vmdisk_desc(self):
274 275
        """Serialize disk object to the vmdriver.
        """
276
        return {
277
            'source': self.path,
278
            'driver_type': self.vm_format,
279
            'driver_cache': 'none',
280
            'target_device': self.device_type + self.dev_num,
281
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
282 283
        }

284
    def get_disk_desc(self):
285 286
        """Serialize disk object to the storage driver.
        """
287 288 289 290 291 292
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
293
            'type': 'snapshot' if self.base else 'normal'
294 295
        }

296 297
    def get_remote_queue_name(self, queue_id='storage', priority=None,
                              check_worker=True):
298 299
        """Returns the proper queue name based on the datastore.
        """
300
        if self.datastore:
301 302
            return self.datastore.get_remote_queue_name(queue_id, priority,
                                                        check_worker)
303 304 305
        else:
            return None

306
    def __unicode__(self):
307
        return u"%s (#%d)" % (self.name, self.id or 0)
308

309
    def clean(self, *args, **kwargs):
Guba Sándor committed
310
        if (self.size is None or "") and self.base:
311 312 313
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

314
    def deploy(self, user=None, task_uuid=None, timeout=15):
315 316 317 318 319
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

320 321 322 323 324 325 326
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

327 328 329 330
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
331 332 333 334
        if self.destroyed:
            self.destroyed = None
            self.save()

335
        if self.is_ready:
336
            return True
337 338 339 340
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
341
            queue_name = self.get_remote_queue_name('storage')
342
            disk_desc = self.get_disk_desc()
343
            if self.base is not None:
344 345
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
346 347
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
348 349 350
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
351 352
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
353 354

            return True
355

356
    def deploy_async(self, user=None):
357 358
        """Execute deploy asynchronously.
        """
359 360
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
361

362
    @classmethod
363 364 365
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
366
        datastore = params.pop('datastore', DataStore.objects.get())
367 368
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
Guba Sándor committed
369
        disk.clean()
370
        disk.save()
Guba Sándor committed
371
        logger.debug("Disk created: %s", params)
372 373 374 375 376
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
377
        return disk
378

379
    @classmethod
380 381 382 383 384 385 386 387
    def create_empty_async(cls, instance=None, user=None, **kwargs):
        """Execute deploy asynchronously.
        """
        return local_tasks.create_empty.apply_async(
            args=[cls, instance, user, kwargs], queue="localhost.man")

    @classmethod
    def create_empty(cls, instance=None, user=None, task_uuid=None, **kwargs):
388 389
        """Create empty Disk object.

390 391
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
392
        :param user: Creator of the disk.
393
        :type user: django.contrib.auth.User
394 395

        :return: Disk object without a real image, to be .deploy()ed later.
396
        """
397
        disk = Disk.create(instance, user, **kwargs)
398
        disk.deploy(user=user, task_uuid=task_uuid)
399
        return disk
400 401

    @classmethod
402
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
403
        """Create disk object and download data from url asynchrnously.
404

405 406
        :param url: URL of image to download.
        :type url: string
407 408
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
409 410 411 412 413 414
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
415 416
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
417 418
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
419

420
    @classmethod
421 422
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
423 424 425 426
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
427 428
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
429 430
        :param user: owner of the disk
        :type user: django.contrib.auth.User
431 432
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
433

434 435
        :return: The created Disk object
        :rtype: Disk
436
        """
437
        kwargs.setdefault('name', url.split('/')[-1])
438
        disk = Disk.create(type="iso", instance=instance, user=user,
439
                           size=None, **kwargs)
440 441
        queue_name = disk.get_remote_queue_name('storage')

442 443 444 445 446 447 448 449 450
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
451

452
        with disk_activity(code_suffix='deploy', disk=disk,
453
                           task_uuid=task_uuid, user=user,
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
470
        return disk
471

472
    def destroy(self, user=None, task_uuid=None):
473 474 475
        if self.destroyed:
            return False

476 477 478 479
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
480

481
            return True
482

483
    def destroy_async(self, user=None):
484 485
        """Execute destroy asynchronously.
        """
486 487
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
488

489
    def restore(self, user=None, task_uuid=None):
490
        """Recover destroyed disk from trash if possible.
491 492 493 494 495 496 497 498
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529
    def clone_async(self, new_disk=None, timeout=300, user=None):
        """Clone a Disk to another Disk

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: AsyncResult
        """
        return local_tasks.clone.apply_async(args=[self, new_disk,
                                                   timeout, user],
                                             queue="localhost.man")

    def clone(self, disk=None, user=None, task_uuid=None, timeout=300):
        """Cloning Disk into another Disk.

        The Disk.type can'T be snapshot.

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: the cloned Disk object.
        """
        banned_types = ['qcow2-snap']
        if self.type in banned_types:
            raise self.WrongDiskTypeError(self.type)
        if self.is_in_use:
            raise self.DiskInUseError(self)
530
        if not self.is_ready:
531 532
            raise self.DiskIsNotReady(self)
        if not disk:
533 534 535
            base = None
            if self.type == "iso":
                base = self
536 537
            disk = Disk.create(datastore=self.datastore,
                               name=self.name, size=self.size,
538
                               type=self.type, base=base)
539 540 541 542 543 544 545 546 547 548 549 550

        with disk_activity(code_suffix="clone", disk=self,
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk

551
    def save_as(self, user=None, task_uuid=None, timeout=300):
552 553
        """Save VM as template.

554 555 556 557
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

558 559 560
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
561
        mapping = {
562 563 564
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
565 566 567 568
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

569
        if self.is_in_use:
570 571
            raise self.DiskInUseError(self)

572
        if not self.is_ready:
Guba Sándor committed
573 574
            raise self.DiskIsNotReady(self)

575 576 577
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

578
        new_type, new_base = mapping[self.type]
579

580 581
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
582 583
                           name=self.name, size=self.size,
                           type=new_type)
584

585
        with disk_activity(code_suffix="save_as", disk=self,
586 587 588 589 590 591 592 593 594
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
595 596 597 598 599 600 601 602


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
603
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
604
        act = cls(activity_code='storage.Disk.' + code_suffix,
605
                  disk=disk, parent=None, started=timezone.now(),
606
                  task_uuid=task_uuid, user=user)
607
        act.save()
608
        return act
609

610 611 612 613 614 615 616 617 618
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

619 620 621
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
622
            disk=self.disk, parent=self, started=timezone.now(),
623 624 625
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
626

627 628 629
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
630
        return activitycontextimpl(act)
631

632

633
@contextmanager
634 635
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
636
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
637
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
638 639 640 641 642 643 644 645 646


@worker_ready.connect()
def cleanup(conf=None, **kwargs):
    # TODO check if other manager workers are running
    for i in DiskActivity.objects.filter(finished__isnull=True):
        i.finish(False, "Manager is restarted, activity is cleaned up. "
                 "You can try again now.")
        logger.error('Forced finishing stale activity %s', i)