models.py 21.7 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
3

4
from contextlib import contextmanager
5
import logging
6
from os.path import join
7 8
import uuid

9
from django.db.models import (Model, CharField, DateTimeField,
10
                              ForeignKey)
11
from django.utils import timezone
12
from django.utils.translation import ugettext_lazy as _
13
from model_utils.models import TimeStampedModel
14
from sizefield.models import FileSizeField
15

16
from acl.models import AclBase
17
from .tasks import local_tasks, remote_tasks
18
from celery.exceptions import TimeoutError
19
from manager.mancelery import celery
20
from common.models import (ActivityModel, activitycontextimpl,
21
                           WorkerNotFound)
22 23 24 25

logger = logging.getLogger(__name__)


26
class DataStore(Model):
Guba Sándor committed
27

28 29
    """Collection of virtual disks.
    """
30 31 32 33
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
34

35 36 37 38 39 40 41 42
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

43
    def get_remote_queue_name(self, queue_id, check_worker=True):
44 45
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
46 47
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
48 49 50
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
51

52 53 54
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
55
                    destroyed__isnull=False) if disk.is_deletable]
56

57

58
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
59

60 61
    """A virtual disk.
    """
62 63 64 65 66
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
67 68
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
69
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
70 71
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
72 73 74
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
75
    size = FileSizeField(null=True, default=None)
76 77 78
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
79
                        verbose_name=_("device number"))
80
    destroyed = DateTimeField(blank=True, default=None, null=True)
81 82 83 84 85 86

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

87 88
    class WrongDiskTypeError(Exception):

89 90 91 92 93 94 95 96
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
97

98 99
    class DiskInUseError(Exception):

100 101 102 103
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
104
                           (disk.name, disk.filename))
105 106 107 108

            Exception.__init__(self, message)

            self.disk = disk
109

Guba Sándor committed
110
    class DiskIsNotReady(Exception):
Guba Sándor committed
111 112
        """ Exception for operations that need a deployed disk.
        """
Guba Sándor committed
113 114 115

        def __init__(self, disk, message=None):
            if message is None:
Guba Sándor committed
116
                message = ("The requested operation can't be performed on "
Guba Sándor committed
117 118 119 120 121 122 123
                           "disk '%s (%s)' because it has never been"
                           "deployed." % (disk.name, disk.filename))

            Exception.__init__(self, message)

            self.disk = disk

124
    @property
125
    def ready(self):
126 127 128 129
        """ Returns True if the disk is physically ready on the storage.

        It needs at least 1 successfull deploy action.
        """
130
        return self.activity_log.filter(activity_code__endswith="deploy",
131 132 133 134 135 136
                                        succeeded=True)

    @property
    def failed(self):
        """ Returns True if the last activity on the disk is failed.
        """
137 138
        result = self.activity_log.all().order_by('-id')[0].succeeded
        return not (result is None) and not result
139

140
    @property
141
    def path(self):
142 143
        """The path where the files are stored.
        """
144
        return join(self.datastore.path, self.filename)
145 146

    @property
147
    def vm_format(self):
148 149
        """Returns the proper file format for different type of images.
        """
150 151 152
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
153
            'iso': 'raw',
154 155 156 157
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

158
    @property
159
    def format(self):
160 161
        """Returns the proper file format for different types of images.
        """
162 163 164 165 166 167 168 169 170
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
171
    def device_type(self):
172 173
        """Returns the proper device prefix for different types of images.
        """
174
        return {
175 176
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
177
            'iso': 'hd',
178 179 180
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
181

182
    def is_downloading(self):
183
        return self.size is None and not self.failed
184 185 186 187

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
188 189 190 191 192 193 194 195
        try:
            task = self.activity_log.filter(
                activity_code__endswith="deploy",
                succeeded__isnull=True)[0].task_uuid
            result = celery.AsyncResult(id=task)
            return result.info.get("percent")
        except:
            return 0
196

197 198 199
    def get_latest_activity_result(self):
        return self.activity_log.latest("pk").result

200
    @property
201
    def is_deletable(self):
202
        """True if the associated file can be deleted.
203
        """
204
        # Check if all children and the disk itself is destroyed.
205
        return (self.destroyed is not None) and self.children_deletable
206

207 208 209
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
210
        """
211
        return all(i.is_deletable for i in self.derivatives.all())
212

213
    @property
214
    def is_in_use(self):
215
        """True if disk is attached to an active VM.
216 217 218 219

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
220
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
221

222 223 224 225 226 227 228 229 230 231 232
    def get_appliance(self):
        """Return an Instance or InstanceTemplate object where the disk is used
        """
        instance = self.instance_set.all()
        template = self.template_set.all()
        app = list(instance) + list(template)
        if len(app) > 0:
            return app[0]
        else:
            return None

233 234
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
235

236 237 238
        This method manipulates the database only.
        """
        type_mapping = {
239 240 241
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
242 243 244 245 246 247
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
248

249 250 251
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
252 253

    def get_vmdisk_desc(self):
254 255
        """Serialize disk object to the vmdriver.
        """
256
        return {
257
            'source': self.path,
258
            'driver_type': self.vm_format,
259
            'driver_cache': 'none',
260
            'target_device': self.device_type + self.dev_num,
261
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
262 263
        }

264
    def get_disk_desc(self):
265 266
        """Serialize disk object to the storage driver.
        """
267 268 269 270 271 272
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
273
            'type': 'snapshot' if self.base else 'normal'
274 275
        }

276
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
277 278
        """Returns the proper queue name based on the datastore.
        """
279
        if self.datastore:
280
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
281 282 283
        else:
            return None

284
    def __unicode__(self):
285
        return u"%s (#%d)" % (self.name, self.id or 0)
286

287
    def clean(self, *args, **kwargs):
Guba Sándor committed
288
        if (self.size is None or "") and self.base:
289 290 291
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

292
    def deploy(self, user=None, task_uuid=None, timeout=15):
293 294 295 296 297
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

298 299 300 301 302 303 304
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

305 306 307 308
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
309 310 311 312
        if self.destroyed:
            self.destroyed = None
            self.save()

313
        if self.ready:
314
            return True
315 316 317 318
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
319
            queue_name = self.get_remote_queue_name('storage')
320
            disk_desc = self.get_disk_desc()
321
            if self.base is not None:
322 323
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
324 325
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
326 327 328
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
329 330
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
331 332

            return True
333

334
    def deploy_async(self, user=None):
335 336
        """Execute deploy asynchronously.
        """
337 338
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
339

340
    @classmethod
341 342 343
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
344
        datastore = params.pop('datastore', DataStore.objects.get())
345 346
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
Guba Sándor committed
347
        disk.clean()
348
        disk.save()
Guba Sándor committed
349
        logger.debug("Disk created: %s", params)
350 351 352 353 354
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
355
        return disk
356

357
    @classmethod
358 359 360 361 362 363 364 365
    def create_empty_async(cls, instance=None, user=None, **kwargs):
        """Execute deploy asynchronously.
        """
        return local_tasks.create_empty.apply_async(
            args=[cls, instance, user, kwargs], queue="localhost.man")

    @classmethod
    def create_empty(cls, instance=None, user=None, task_uuid=None, **kwargs):
366 367
        """Create empty Disk object.

368 369
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
370
        :param user: Creator of the disk.
371
        :type user: django.contrib.auth.User
372 373

        :return: Disk object without a real image, to be .deploy()ed later.
374
        """
375
        disk = Disk.create(instance, user, **kwargs)
376
        disk.deploy(user=user, task_uuid=task_uuid)
377
        return disk
378 379

    @classmethod
380
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
381
        """Create disk object and download data from url asynchrnously.
382

383 384
        :param url: URL of image to download.
        :type url: string
385 386
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
387 388 389 390 391 392
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
393 394
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
395 396
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
397

398
    @classmethod
399 400
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
401 402 403 404
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
405 406
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
407 408
        :param user: owner of the disk
        :type user: django.contrib.auth.User
409 410
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
411

412 413
        :return: The created Disk object
        :rtype: Disk
414
        """
415
        kwargs.setdefault('name', url.split('/')[-1])
416
        disk = Disk.create(type="iso", instance=instance, user=user,
417
                           size=None, **kwargs)
418 419
        queue_name = disk.get_remote_queue_name('storage')

420 421 422 423 424 425 426 427 428
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
429

430
        with disk_activity(code_suffix='deploy', disk=disk,
431
                           task_uuid=task_uuid, user=user,
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
448
        return disk
449

450
    def destroy(self, user=None, task_uuid=None):
451 452 453
        if self.destroyed:
            return False

454 455 456 457
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
458

459
            return True
460

461
    def destroy_async(self, user=None):
462 463
        """Execute destroy asynchronously.
        """
464 465
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
466

467
    def restore(self, user=None, task_uuid=None):
468
        """Recover destroyed disk from trash if possible.
469 470 471 472 473 474 475 476
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
    def clone_async(self, new_disk=None, timeout=300, user=None):
        """Clone a Disk to another Disk

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: AsyncResult
        """
        return local_tasks.clone.apply_async(args=[self, new_disk,
                                                   timeout, user],
                                             queue="localhost.man")

    def clone(self, disk=None, user=None, task_uuid=None, timeout=300):
        """Cloning Disk into another Disk.

        The Disk.type can'T be snapshot.

        :param new_disk: optional, the new Disk object to clone in
        :type new_disk: storage.models.Disk
        :param user: Creator of the disk.
        :type user: django.contrib.auth.User

        :return: the cloned Disk object.
        """
        banned_types = ['qcow2-snap']
        if self.type in banned_types:
            raise self.WrongDiskTypeError(self.type)
        if self.is_in_use:
            raise self.DiskInUseError(self)
        if not self.ready:
            raise self.DiskIsNotReady(self)
        if not disk:
            disk = Disk.create(datastore=self.datastore,
                               name=self.name, size=self.size,
                               type=self.type)

        with disk_activity(code_suffix="clone", disk=self,
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk

526 527 528 529 530
    def save_as_async(self, disk, task_uuid=None, timeout=300, user=None):
        return local_tasks.save_as.apply_async(args=[disk, timeout, user],
                                               queue="localhost.man")

    def save_as(self, user=None, task_uuid=None, timeout=300):
531 532
        """Save VM as template.

533 534 535 536
        Based on disk type:
        qcow2-norm, qcow2-snap --> qcow2-norm
        iso                    --> iso (with base)

537 538 539
        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
540
        mapping = {
541 542 543
            'qcow2-snap': ('qcow2-norm', None),
            'qcow2-norm': ('qcow2-norm', None),
            'iso': ("iso", self),
544 545 546 547
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

548
        if self.is_in_use:
549 550
            raise self.DiskInUseError(self)

Guba Sándor committed
551 552 553
        if not self.ready:
            raise self.DiskIsNotReady(self)

554 555 556
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

557
        new_type, new_base = mapping[self.type]
558

559 560
        disk = Disk.create(datastore=self.datastore,
                           base=new_base,
561 562
                           name=self.name, size=self.size,
                           type=new_type)
563

564
        with disk_activity(code_suffix="save_as", disk=self,
565 566 567 568 569 570 571 572 573
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
574 575 576 577 578 579 580 581


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
582
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
583
        act = cls(activity_code='storage.Disk.' + code_suffix,
584
                  disk=disk, parent=None, started=timezone.now(),
585
                  task_uuid=task_uuid, user=user)
586
        act.save()
587
        return act
588

589 590 591 592 593 594 595 596 597
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

598 599 600
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
601
            disk=self.disk, parent=self, started=timezone.now(),
602 603 604
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
605

606 607 608
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
609
        return activitycontextimpl(act)
610

611

612
@contextmanager
613 614
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
615
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
616
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)