models.py 19.2 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
3

4
from contextlib import contextmanager
5
import logging
6
from os.path import join
7 8
import uuid

9
from django.db.models import (Model, CharField, DateTimeField,
10
                              ForeignKey)
11
from django.utils import timezone
12
from django.utils.translation import ugettext_lazy as _
13
from model_utils.models import TimeStampedModel
14
from sizefield.models import FileSizeField
15

16
from acl.models import AclBase
17
from .tasks import local_tasks, remote_tasks
18
from celery.exceptions import TimeoutError
19
from manager.mancelery import celery
20
from common.models import (ActivityModel, activitycontextimpl,
21
                           WorkerNotFound)
22 23 24 25

logger = logging.getLogger(__name__)


26
class DataStore(Model):
Guba Sándor committed
27

28 29
    """Collection of virtual disks.
    """
30 31 32 33
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
34

35 36 37 38 39 40 41 42
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

43
    def get_remote_queue_name(self, queue_id, check_worker=True):
44 45
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
46 47
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
48 49 50
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
51

52 53 54
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
55
                    destroyed__isnull=False) if disk.is_deletable]
56

57

58
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
59

60 61
    """A virtual disk.
    """
62 63 64 65 66
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
67 68
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
69
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
70 71
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
72 73 74
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
75
    size = FileSizeField(null=True, default=None)
76 77 78
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
79
                        verbose_name=_("device number"))
80
    destroyed = DateTimeField(blank=True, default=None, null=True)
81 82 83 84 85 86

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

87 88
    class WrongDiskTypeError(Exception):

89 90 91 92 93 94 95 96
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
97

98 99
    class DiskInUseError(Exception):

100 101 102 103
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
104
                           (disk.name, disk.filename))
105 106 107 108

            Exception.__init__(self, message)

            self.disk = disk
109

Guba Sándor committed
110
    class DiskIsNotReady(Exception):
Guba Sándor committed
111 112
        """ Exception for operations that need a deployed disk.
        """
Guba Sándor committed
113 114 115

        def __init__(self, disk, message=None):
            if message is None:
Guba Sándor committed
116
                message = ("The requested operation can't be performed on "
Guba Sándor committed
117 118 119 120 121 122 123
                           "disk '%s (%s)' because it has never been"
                           "deployed." % (disk.name, disk.filename))

            Exception.__init__(self, message)

            self.disk = disk

124
    @property
125
    def ready(self):
126 127 128 129
        """ Returns True if the disk is physically ready on the storage.

        It needs at least 1 successfull deploy action.
        """
130
        return self.activity_log.filter(activity_code__endswith="deploy",
131 132 133 134 135 136
                                        succeeded=True)

    @property
    def failed(self):
        """ Returns True if the last activity on the disk is failed.
        """
137 138
        result = self.activity_log.all().order_by('-id')[0].succeeded
        return not (result is None) and not result
139

140
    @property
141
    def path(self):
142 143
        """The path where the files are stored.
        """
144
        return join(self.datastore.path, self.filename)
145 146

    @property
147
    def vm_format(self):
148 149
        """Returns the proper file format for different type of images.
        """
150 151 152
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
153
            'iso': 'raw',
154 155 156 157
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

158
    @property
159
    def format(self):
160 161
        """Returns the proper file format for different types of images.
        """
162 163 164 165 166 167 168 169 170
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
171
    def device_type(self):
172 173
        """Returns the proper device prefix for different types of images.
        """
174
        return {
175 176
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
177
            'iso': 'hd',
178 179 180
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
181

182
    def is_downloading(self):
183
        return self.size is None and not self.failed
184 185 186 187

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
188 189 190 191 192 193 194 195
        try:
            task = self.activity_log.filter(
                activity_code__endswith="deploy",
                succeeded__isnull=True)[0].task_uuid
            result = celery.AsyncResult(id=task)
            return result.info.get("percent")
        except:
            return 0
196

197 198 199
    def get_latest_activity_result(self):
        return self.activity_log.latest("pk").result

200
    @property
201
    def is_deletable(self):
202
        """True if the associated file can be deleted.
203
        """
204
        # Check if all children and the disk itself is destroyed.
205
        return (self.destroyed is not None) and self.children_deletable
206

207 208 209
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
210
        """
211
        return all(i.is_deletable for i in self.derivatives.all())
212

213
    @property
214
    def is_in_use(self):
215
        """True if disk is attached to an active VM.
216 217 218 219

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
220
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
221

222 223 224 225 226 227 228 229 230 231 232
    def get_appliance(self):
        """Return an Instance or InstanceTemplate object where the disk is used
        """
        instance = self.instance_set.all()
        template = self.template_set.all()
        app = list(instance) + list(template)
        if len(app) > 0:
            return app[0]
        else:
            return None

233 234
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
235

236 237 238
        This method manipulates the database only.
        """
        type_mapping = {
239 240 241
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
242 243 244 245 246 247
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
248

249 250 251
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
252 253

    def get_vmdisk_desc(self):
254 255
        """Serialize disk object to the vmdriver.
        """
256
        return {
257
            'source': self.path,
258
            'driver_type': self.vm_format,
259
            'driver_cache': 'none',
260
            'target_device': self.device_type + self.dev_num,
261
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
262 263
        }

264
    def get_disk_desc(self):
265 266
        """Serialize disk object to the storage driver.
        """
267 268 269 270 271 272
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
273
            'type': 'snapshot' if self.base else 'normal'
274 275
        }

276
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
277 278
        """Returns the proper queue name based on the datastore.
        """
279
        if self.datastore:
280
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
281 282 283
        else:
            return None

284
    def __unicode__(self):
285
        return u"%s (#%d)" % (self.name, self.id or 0)
286

287
    def clean(self, *args, **kwargs):
Guba Sándor committed
288
        if (self.size is None or "") and self.base:
289 290 291
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

292
    def deploy(self, user=None, task_uuid=None, timeout=15):
293 294 295 296 297
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

298 299 300 301 302 303 304
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

305 306 307 308
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
309 310 311 312
        if self.destroyed:
            self.destroyed = None
            self.save()

313
        if self.ready:
314
            return True
315 316 317 318
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
319
            queue_name = self.get_remote_queue_name('storage')
320
            disk_desc = self.get_disk_desc()
321
            if self.base is not None:
322 323
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
324 325
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
326 327 328
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
329 330
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
331 332

            return True
333

334
    def deploy_async(self, user=None):
335 336
        """Execute deploy asynchronously.
        """
337 338
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
339

340
    @classmethod
341 342 343
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
344
        datastore = params.pop('datastore', DataStore.objects.get())
345 346
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
Guba Sándor committed
347
        disk.clean()
348
        disk.save()
Guba Sándor committed
349
        logger.debug("Disk created: %s", params)
350 351 352 353 354
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
355
        return disk
356

357
    @classmethod
358
    def create_empty(cls, instance=None, user=None, **kwargs):
359 360
        """Create empty Disk object.

361 362
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
363
        :param user: Creator of the disk.
364
        :type user: django.contrib.auth.User
365 366

        :return: Disk object without a real image, to be .deploy()ed later.
367
        """
368
        disk = Disk.create(instance, user, **kwargs)
369
        return disk
370 371

    @classmethod
372
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
373
        """Create disk object and download data from url asynchrnously.
374

375 376
        :param url: URL of image to download.
        :type url: string
377 378
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
379 380 381 382 383 384
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
385 386
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
387 388
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
389

390
    @classmethod
391 392
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
393 394 395 396
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
397 398
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
399 400
        :param user: owner of the disk
        :type user: django.contrib.auth.User
401 402
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
403

404 405
        :return: The created Disk object
        :rtype: Disk
406
        """
407
        kwargs.setdefault('name', url.split('/')[-1])
408
        disk = Disk.create(type="iso", instance=instance, user=user,
409
                           size=None, **kwargs)
410 411
        queue_name = disk.get_remote_queue_name('storage')

412 413 414 415 416 417 418 419 420
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
421

422
        with disk_activity(code_suffix='deploy', disk=disk,
423
                           task_uuid=task_uuid, user=user,
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
440
        return disk
441

442
    def destroy(self, user=None, task_uuid=None):
443 444 445
        if self.destroyed:
            return False

446 447 448 449
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
450

451
            return True
452

453
    def destroy_async(self, user=None):
454 455
        """Execute destroy asynchronously.
        """
456 457
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
458

459
    def restore(self, user=None, task_uuid=None):
460
        """Recover destroyed disk from trash if possible.
461 462 463 464 465 466 467 468
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

469 470 471 472 473
    def save_as_async(self, disk, task_uuid=None, timeout=300, user=None):
        return local_tasks.save_as.apply_async(args=[disk, timeout, user],
                                               queue="localhost.man")

    def save_as(self, user=None, task_uuid=None, timeout=300):
474 475 476 477 478
        """Save VM as template.

        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
479
        mapping = {
480 481
            'qcow2-snap': 'qcow2-norm',
            'qcow2-norm': 'qcow2-norm',
482 483 484 485
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

486
        if self.is_in_use:
487 488
            raise self.DiskInUseError(self)

Guba Sándor committed
489 490 491
        if not self.ready:
            raise self.DiskIsNotReady(self)

492 493 494
        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

495
        new_type = mapping[self.type]
496

497
        disk = Disk.create(datastore=self.datastore,
498 499
                           name=self.name, size=self.size,
                           type=new_type)
500

501
        with disk_activity(code_suffix="save_as", disk=self,
502 503 504 505 506 507 508 509 510
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
511 512 513 514 515 516 517 518


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
519
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
520
        act = cls(activity_code='storage.Disk.' + code_suffix,
521
                  disk=disk, parent=None, started=timezone.now(),
522
                  task_uuid=task_uuid, user=user)
523
        act.save()
524
        return act
525

526 527 528 529 530 531 532 533 534
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

535 536 537
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
538
            disk=self.disk, parent=self, started=timezone.now(),
539 540 541
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
542

543 544 545
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
546
        return activitycontextimpl(act)
547

548

549
@contextmanager
550 551
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
552
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
553
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)