models.py 17.9 KB
Newer Older
1 2
# coding=utf-8

3
from contextlib import contextmanager
4
import logging
5
from os.path import join
6 7
import uuid

8
from django.db.models import (Model, CharField, DateTimeField,
9
                              ForeignKey)
10
from django.utils import timezone
11
from django.utils.translation import ugettext_lazy as _
12
from model_utils.models import TimeStampedModel
13
from sizefield.models import FileSizeField
14

15
from acl.models import AclBase
16
from .tasks import local_tasks, remote_tasks
17
from celery.exceptions import TimeoutError
18
from manager.mancelery import celery
19
from common.models import (ActivityModel, activitycontextimpl,
20
                           WorkerNotFound)
21 22 23 24

logger = logging.getLogger(__name__)


25
class DataStore(Model):
Guba Sándor committed
26

27 28
    """Collection of virtual disks.
    """
29 30 31 32
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
33

34 35 36 37 38 39 40 41
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

42
    def get_remote_queue_name(self, queue_id, check_worker=True):
43 44
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
45 46
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
47 48 49
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
50

51 52 53
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
54
                    destroyed__isnull=False) if disk.is_deletable]
55

56

57
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
58

59 60
    """A virtual disk.
    """
61 62 63 64 65
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
66 67
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
68
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
69 70
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
71 72 73
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
74
    size = FileSizeField(null=True, default=None)
75 76 77
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
78
                        verbose_name=_("device number"))
79
    destroyed = DateTimeField(blank=True, default=None, null=True)
80 81 82 83 84 85

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

86 87
    class WrongDiskTypeError(Exception):

88 89 90 91 92 93 94 95
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
96

97 98
    class DiskInUseError(Exception):

99 100 101 102
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
103
                           (disk.name, disk.filename))
104 105 106 107

            Exception.__init__(self, message)

            self.disk = disk
108

109
    @property
110 111 112 113
    def ready(self):
        return self.activity_log.filter(activity_code__endswith="deploy",
                                        succeeded__isnull=False)

114
    @property
115
    def path(self):
116 117
        """The path where the files are stored.
        """
118
        return join(self.datastore.path, self.filename)
119 120

    @property
121
    def vm_format(self):
122 123
        """Returns the proper file format for different type of images.
        """
124 125 126
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
127
            'iso': 'raw',
128 129 130 131
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

132
    @property
133
    def format(self):
134 135
        """Returns the proper file format for different types of images.
        """
136 137 138 139 140 141 142 143 144
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
145
    def device_type(self):
146 147
        """Returns the proper device prefix for different types of images.
        """
148
        return {
149 150
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
151
            'iso': 'hd',
152 153 154
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
155

156
    def is_downloading(self):
157 158 159
        return self.activity_log.filter(
            activity_code__endswith="downloading_disk",
            succeeded__isnull=True)
160 161 162 163

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
164 165 166
        task = self.activity_log.filter(
            activity_code__endswith="deploy",
            succeeded__isnull=True)[0].task_uuid
167 168 169
        result = celery.AsyncResult(id=task)
        return result.info.get("percent")

170
    @property
171
    def is_deletable(self):
172
        """True if the associated file can be deleted.
173
        """
174
        # Check if all children and the disk itself is destroyed.
175
        return (self.destroyed is not None) and self.children_deletable
176

177 178 179
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
180
        """
181
        return all(i.is_deletable for i in self.derivatives.all())
182

183
    @property
184
    def is_in_use(self):
185
        """True if disk is attached to an active VM.
186 187 188 189

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
190
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
191

192 193
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
194

195 196 197
        This method manipulates the database only.
        """
        type_mapping = {
198 199 200
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
201 202 203 204 205 206
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
207

208 209 210
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
211 212

    def get_vmdisk_desc(self):
213 214
        """Serialize disk object to the vmdriver.
        """
215
        return {
216
            'source': self.path,
217
            'driver_type': self.vm_format,
218
            'driver_cache': 'none',
219
            'target_device': self.device_type + self.dev_num,
220
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
221 222
        }

223
    def get_disk_desc(self):
224 225
        """Serialize disk object to the storage driver.
        """
226 227 228 229 230 231
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
232
            'type': 'snapshot' if self.base else 'normal'
233 234
        }

235
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
236 237
        """Returns the proper queue name based on the datastore.
        """
238
        if self.datastore:
239
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
240 241 242
        else:
            return None

243
    def __unicode__(self):
244
        return u"%s (#%d)" % (self.name, self.id or 0)
245

246 247 248 249 250
    def clean(self, *args, **kwargs):
        if self.size == "" and self.base:
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

251
    def deploy(self, user=None, task_uuid=None, timeout=15):
252 253 254 255 256
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

257 258 259 260 261 262 263
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

264 265 266 267
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
268 269 270 271
        if self.destroyed:
            self.destroyed = None
            self.save()

272
        if self.ready:
273
            return True
274 275 276 277
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
278
            queue_name = self.get_remote_queue_name('storage')
279
            disk_desc = self.get_disk_desc()
280
            if self.base is not None:
281 282
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
283 284
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
285 286 287
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
288 289
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
290 291

            return True
292

293
    def deploy_async(self, user=None):
294 295
        """Execute deploy asynchronously.
        """
296 297
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
298

299
    @classmethod
300 301 302
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
303
        datastore = params.pop('datastore', DataStore.objects.get())
304 305
        filename = params.pop('filename', str(uuid.uuid4()))
        disk = cls(filename=filename, datastore=datastore, **params)
306
        disk.save()
307 308 309 310 311
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
312
        return disk
313

314
    @classmethod
315
    def create_empty(cls, instance=None, user=None, **kwargs):
316 317
        """Create empty Disk object.

318 319
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
320
        :param user: Creator of the disk.
321
        :type user: django.contrib.auth.User
322 323

        :return: Disk object without a real image, to be .deploy()ed later.
324
        """
325
        disk = Disk.create(instance, user, **kwargs)
326
        return disk
327 328

    @classmethod
329
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
330
        """Create disk object and download data from url asynchrnously.
331

332 333
        :param url: URL of image to download.
        :type url: string
334 335
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
336 337 338 339 340 341
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
342 343
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
344 345
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
346

347
    @classmethod
348 349
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
350 351 352 353
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
354 355
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
356 357
        :param user: owner of the disk
        :type user: django.contrib.auth.User
358 359
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
360

361 362
        :return: The created Disk object
        :rtype: Disk
363
        """
364
        kwargs.setdefault('name', url.split('/')[-1])
365
        disk = Disk.create(type="iso", instance=instance, user=user,
366
                           size=None, **kwargs)
367
        # TODO get proper datastore
368
        disk.datastore = DataStore.objects.get()
369 370
        queue_name = disk.get_remote_queue_name('storage')

371 372 373 374 375 376 377 378 379
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
380

381
        with disk_activity(code_suffix='deploy', disk=disk,
382
                           task_uuid=task_uuid, user=user,
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
399
        return disk
400

401
    def destroy(self, user=None, task_uuid=None):
402 403 404
        if self.destroyed:
            return False

405 406 407 408
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
409

410
            return True
411

412
    def destroy_async(self, user=None):
413 414
        """Execute destroy asynchronously.
        """
415 416
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
417

418
    def restore(self, user=None, task_uuid=None):
419
        """Recover destroyed disk from trash if possible.
420 421 422 423 424 425 426 427
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

428 429 430 431 432
    def save_as_async(self, disk, task_uuid=None, timeout=300, user=None):
        return local_tasks.save_as.apply_async(args=[disk, timeout, user],
                                               queue="localhost.man")

    def save_as(self, user=None, task_uuid=None, timeout=300):
433 434 435 436 437
        """Save VM as template.

        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
438 439 440 441 442 443
        mapping = {
            'qcow2-snap': ('qcow2-norm', self.base),
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

444
        if self.is_in_use:
445 446 447 448 449
            raise self.DiskInUseError(self)

        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

450
        new_type, new_base = mapping[self.type]
451

452 453 454
        disk = Disk.create(base=new_base, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
455

456 457
        disk.save()
        with disk_activity(code_suffix="save_as", disk=self,
458 459 460 461 462 463 464 465 466
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
467 468 469 470 471 472 473 474


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
475
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
476
        act = cls(activity_code='storage.Disk.' + code_suffix,
477
                  disk=disk, parent=None, started=timezone.now(),
478
                  task_uuid=task_uuid, user=user)
479
        act.save()
480
        return act
481

482 483 484 485 486 487 488 489 490
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

491 492 493
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
494
            disk=self.disk, parent=self, started=timezone.now(),
495 496 497
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
498

499 500 501
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
502
        return activitycontextimpl(act)
503

504

505
@contextmanager
506 507
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
508
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
509
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)