models.py 17.9 KB
Newer Older
1 2
# coding=utf-8

3
from contextlib import contextmanager
4
import logging
5
from os.path import join
6 7
import uuid

8
from django.db.models import (Model, CharField, DateTimeField,
9
                              ForeignKey)
10
from django.utils import timezone
11
from django.utils.translation import ugettext_lazy as _
12
from model_utils.models import TimeStampedModel
13
from sizefield.models import FileSizeField
14
from datetime import timedelta
15

16
from acl.models import AclBase
17
from .tasks import local_tasks, remote_tasks
18
from celery.exceptions import TimeoutError
19
from manager.mancelery import celery
20
from common.models import (ActivityModel, activitycontextimpl,
21
                           WorkerNotFound)
22 23 24 25

logger = logging.getLogger(__name__)


26
class DataStore(Model):
Guba Sándor committed
27

28 29
    """Collection of virtual disks.
    """
30 31 32 33
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
34

35 36 37 38 39 40 41 42
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

43
    def get_remote_queue_name(self, queue_id, check_worker=True):
44 45
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
46 47
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
48 49 50
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
51

52 53 54
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
55
                    destroyed__isnull=False) if disk.is_deletable]
56

57

58
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
59

60 61
    """A virtual disk.
    """
62 63 64 65 66
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
67 68
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
69
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
70 71
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
72 73 74
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
75
    size = FileSizeField()
76 77 78
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
79
                        verbose_name=_("device number"))
80
    destroyed = DateTimeField(blank=True, default=None, null=True)
81 82 83 84 85 86

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

87 88
    class WrongDiskTypeError(Exception):

89 90 91 92 93 94 95 96
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
97

98 99
    class DiskInUseError(Exception):

100 101 102 103
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
104
                           (disk.name, disk.filename))
105 106 107 108

            Exception.__init__(self, message)

            self.disk = disk
109

110
    @property
111 112 113 114
    def ready(self):
        return self.activity_log.filter(activity_code__endswith="deploy",
                                        succeeded__isnull=False)

115
    @property
116
    def path(self):
117 118
        """The path where the files are stored.
        """
119
        return join(self.datastore.path, self.filename)
120 121

    @property
122
    def vm_format(self):
123 124
        """Returns the proper file format for different type of images.
        """
125 126 127
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
128
            'iso': 'raw',
129 130 131 132
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

133
    @property
134
    def format(self):
135 136
        """Returns the proper file format for different types of images.
        """
137 138 139 140 141 142 143 144 145
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
146
    def device_type(self):
147 148
        """Returns the proper device prefix for different types of images.
        """
149
        return {
150 151
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
152
            'iso': 'hd',
153 154 155
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
156

157
    def is_downloading(self):
158 159 160
        return self.activity_log.filter(
            activity_code__endswith="downloading_disk",
            succeeded__isnull=True)
161 162 163 164

    def get_download_percentage(self):
        if not self.is_downloading():
            return None
165 166 167
        task = self.activity_log.filter(
            activity_code__endswith="deploy",
            succeeded__isnull=True)[0].task_uuid
168 169 170
        result = celery.AsyncResult(id=task)
        return result.info.get("percent")

171
    @property
172
    def is_deletable(self):
173
        """True if the associated file can be deleted.
174
        """
175
        # Check if all children and the disk itself is destroyed.
176 177
        yesterday = timezone.now() - timedelta(days=1)
        return (self.destroyed is not None
178
                and self.destroyed < yesterday) and self.children_deletable
179

180 181 182
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
183
        """
184
        return all(i.is_deletable for i in self.derivatives.all())
185

186
    @property
187
    def is_in_use(self):
188
        """True if disk is attached to an active VM.
189 190 191 192

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
193
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
194

195 196
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
197

198 199 200
        This method manipulates the database only.
        """
        type_mapping = {
201 202 203
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
204 205 206 207 208 209
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
210

211 212 213
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
214 215

    def get_vmdisk_desc(self):
216 217
        """Serialize disk object to the vmdriver.
        """
218
        return {
219
            'source': self.path,
220
            'driver_type': self.vm_format,
221
            'driver_cache': 'none',
222
            'target_device': self.device_type + self.dev_num,
223
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
224 225
        }

226
    def get_disk_desc(self):
227 228
        """Serialize disk object to the storage driver.
        """
229 230 231 232 233 234
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
235
            'type': 'snapshot' if self.base else 'normal'
236 237
        }

238
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
239 240
        """Returns the proper queue name based on the datastore.
        """
241
        if self.datastore:
242
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
243 244 245
        else:
            return None

246
    def __unicode__(self):
247
        return u"%s (#%d)" % (self.name, self.id or 0)
248

249 250 251 252 253
    def clean(self, *args, **kwargs):
        if self.size == "" and self.base:
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

254
    def deploy(self, user=None, task_uuid=None, timeout=15):
255 256 257 258 259
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

260 261 262 263 264 265 266
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

267 268 269 270
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
271 272 273 274
        if self.destroyed:
            self.destroyed = None
            self.save()

275
        if self.ready:
276
            return True
277 278 279 280
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
281
            queue_name = self.get_remote_queue_name('storage')
282
            disk_desc = self.get_disk_desc()
283
            if self.base is not None:
284 285
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
286 287
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
288 289 290
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
291 292
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
293 294

            return True
295

296
    def deploy_async(self, user=None):
297 298
        """Execute deploy asynchronously.
        """
299 300
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
301

302
    @classmethod
303 304 305
    def create(cls, instance=None, user=None, **params):
        """Create disk with activity.
        """
306 307
        datastore = params.pop('datastore', DataStore.objects.get())
        disk = cls(filename=str(uuid.uuid4()), datastore=datastore, **params)
308
        disk.save()
309 310 311 312 313
        with disk_activity(code_suffix="create",
                           user=user,
                           disk=disk):
            if instance:
                instance.disks.add(disk)
314
        return disk
315

316
    @classmethod
317
    def create_empty(cls, instance=None, user=None, **kwargs):
318 319
        """Create empty Disk object.

320 321
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
322
        :param user: Creator of the disk.
323
        :type user: django.contrib.auth.User
324 325

        :return: Disk object without a real image, to be .deploy()ed later.
326
        """
327 328
        disk = Disk.create(instance=None, user=None, **kwargs)
        return disk
329 330

    @classmethod
331
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
332
        """Create disk object and download data from url asynchrnously.
333

334 335
        :param url: URL of image to download.
        :type url: string
336 337
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
338 339 340 341 342 343
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
344 345
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
346 347
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
348

349
    @classmethod
350 351
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
352 353 354 355
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
356 357
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
358 359
        :param user: owner of the disk
        :type user: django.contrib.auth.User
360 361
        :param task_uuid: UUID of the local task
        :param abortable_task: UUID of the remote running abortable task.
362

363 364
        :return: The created Disk object
        :rtype: Disk
365
        """
366
        kwargs.setdefault('name', url.split('/')[-1])
367 368
        disk = Disk.create(type="iso", instance=instance, user=user,
                           size=1, **kwargs)
369
        # TODO get proper datastore
370
        disk.datastore = DataStore.objects.get()
371 372
        queue_name = disk.get_remote_queue_name('storage')

373 374 375 376 377 378 379 380 381
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
382

383
        with disk_activity(code_suffix='deploy', disk=disk,
384
                           task_uuid=task_uuid, user=user,
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
                           on_abort=__on_abort) as act:
            with act.sub_activity('downloading_disk'):
                result = remote_tasks.download.apply_async(
                    kwargs={'url': url, 'parent_id': task_uuid,
                            'disk': disk.get_disk_desc()},
                    queue=queue_name)
                while True:
                    try:
                        size = result.get(timeout=5)
                        break
                    except TimeoutError:
                        if abortable_task and abortable_task.is_aborted():
                            AbortableAsyncResult(result.id).abort()
                            raise AbortException("Download aborted by user.")
                disk.size = size
                disk.save()
401
        return disk
402

403
    def destroy(self, user=None, task_uuid=None):
404 405 406
        if self.destroyed:
            return False

407 408 409 410
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
411

412
            return True
413

414
    def destroy_async(self, user=None):
415 416
        """Execute destroy asynchronously.
        """
417 418
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
419

420
    def restore(self, user=None, task_uuid=None):
421
        """Recover destroyed disk from trash if possible.
422 423 424 425 426 427 428 429
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

430 431 432 433 434
    def save_as_async(self, disk, task_uuid=None, timeout=300, user=None):
        return local_tasks.save_as.apply_async(args=[disk, timeout, user],
                                               queue="localhost.man")

    def save_as(self, user=None, task_uuid=None, timeout=300):
435 436 437 438 439
        """Save VM as template.

        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
440 441 442 443 444 445
        mapping = {
            'qcow2-snap': ('qcow2-norm', self.base),
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

446
        if self.is_in_use:
447 448 449 450 451
            raise self.DiskInUseError(self)

        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

452
        new_type, new_base = mapping[self.type]
453

454 455 456
        disk = Disk.create(base=new_base, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
457

458 459
        disk.save()
        with disk_activity(code_suffix="save_as", disk=self,
460 461 462 463 464 465 466 467 468
                           user=user, task_uuid=task_uuid):
            with disk_activity(code_suffix="deploy", disk=disk,
                               user=user, task_uuid=task_uuid):
                queue_name = self.get_remote_queue_name('storage')
                remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                     disk.get_disk_desc()],
                                               queue=queue_name
                                               ).get()  # Timeout
            return disk
469 470 471 472 473 474 475 476


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
477
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
478
        act = cls(activity_code='storage.Disk.' + code_suffix,
479
                  disk=disk, parent=None, started=timezone.now(),
480
                  task_uuid=task_uuid, user=user)
481
        act.save()
482
        return act
483

484 485 486 487 488 489 490 491 492
    def __unicode__(self):
        if self.parent:
            return '{}({})->{}'.format(self.parent.activity_code,
                                       self.disk,
                                       self.activity_code)
        else:
            return '{}({})'.format(self.activity_code,
                                   self.disk)

493 494 495
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
496
            disk=self.disk, parent=self, started=timezone.now(),
497 498 499
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
500

501 502 503
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
504
        return activitycontextimpl(act)
505

506

507
@contextmanager
508 509
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
510
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
511
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)