models.py 16.4 KB
Newer Older
1 2
# coding=utf-8

3
from contextlib import contextmanager
4
import logging
5
from os.path import join
6 7
import uuid

8
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
9
                              ForeignKey)
10
from django.utils import timezone
11
from django.utils.translation import ugettext_lazy as _
12
from model_utils.models import TimeStampedModel
13
from sizefield.models import FileSizeField
14
from datetime import timedelta
15

16
from acl.models import AclBase
17
from .tasks import local_tasks, remote_tasks
18
from celery.exceptions import TimeoutError
19
from common.models import ActivityModel, activitycontextimpl, WorkerNotFound
20 21 22 23

logger = logging.getLogger(__name__)


24
class DataStore(Model):
Guba Sándor committed
25

26 27
    """Collection of virtual disks.
    """
28 29 30 31
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
32

33 34 35 36 37 38 39 40
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

41
    def get_remote_queue_name(self, queue_id, check_worker=True):
42 43
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
44 45
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
46 47 48
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
49

50 51 52 53 54
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
                    destroyed__isnull=False) if disk.is_deletable()]

55

56
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
57

58 59
    """A virtual disk.
    """
60 61 62 63 64
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
65 66
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
67 68 69 70 71
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
    filename = CharField(max_length=256, verbose_name=_("filename"))
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
72
    size = FileSizeField()
73 74
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
75 76
    ready = BooleanField(default=False,
                         help_text=_("The associated resource is ready."))
77
    dev_num = CharField(default='a', max_length=1,
78
                        verbose_name=_("device number"))
79
    destroyed = DateTimeField(blank=True, default=None, null=True)
80 81 82 83 84 85

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

86 87
    class WrongDiskTypeError(Exception):

88 89 90 91 92 93 94 95
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
96

97 98
    class DiskInUseError(Exception):

99 100 101 102
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
103
                           (disk.name, disk.filename))
104 105 106 107

            Exception.__init__(self, message)

            self.disk = disk
108

109 110
    @property
    def path(self):
111
        """Get the path where the files are stored."""
112
        return join(self.datastore.path, self.filename)
113 114 115

    @property
    def format(self):
116
        """Returns the proper file format for different type of images."""
117 118 119
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
120
            'iso': 'raw',
121 122 123 124
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

125 126
    @property
    def device_type(self):
127
        """Returns the proper device prefix for different file format."""
128
        return {
129 130
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
131
            'iso': 'hd',
132 133 134
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
135

136
    def is_deletable(self):
137 138 139 140 141 142 143 144
        """Returns whether the file can be deleted.

        Checks if all children and the disk itself is destroyed.
        """

        yesterday = timezone.now() - timedelta(days=1)
        return (self.destroyed is not None
                and self.destroyed < yesterday) and not self.has_active_child()
145 146

    def has_active_child(self):
147 148 149 150
        """Returns if disk has children that are not destroyed.
        """

        return any((not i.is_deletable() for i in self.derivatives.all()))
151

152
    def is_in_use(self):
153 154 155 156 157
        """Returns if disk is attached to an active VM.

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
158
        return any([i.state != 'STOPPED' for i in self.instance_set.all()])
159

160 161
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
162

163 164 165
        This method manipulates the database only.
        """
        type_mapping = {
166 167 168
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
169 170 171 172 173
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

174
        filename = self.filename if self.type == 'iso' else None
175
        new_type = type_mapping[self.type]
176

177 178 179
        return Disk.objects.create(base=self, datastore=self.datastore,
                                   filename=filename, name=self.name,
                                   size=self.size, type=new_type)
180 181

    def get_vmdisk_desc(self):
182
        """Serialize disk object to the vmdriver."""
183
        return {
184
            'source': self.path,
185
            'driver_type': self.format,
186
            'driver_cache': 'none',
187
            'target_device': self.device_type + self.dev_num,
188
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
189 190
        }

191
    def get_disk_desc(self):
192
        """Serialize disk object to the storage driver."""
193 194 195 196 197 198 199 200 201
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
            'type': 'snapshot' if self.type == 'qcow2-snap' else 'normal'
        }

202
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
203
        """Returns the proper queue name based on the datastore."""
204
        if self.datastore:
205
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
206 207 208
        else:
            return None

209
    def __unicode__(self):
210
        return u"%s (#%d)" % (self.name, self.id or 0)
211

212 213 214 215 216
    def clean(self, *args, **kwargs):
        if self.size == "" and self.base:
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

217
    def save(self, *args, **kwargs):
218 219
        if self.filename is None:
            self.generate_filename()
220
        return super(Disk, self).save(*args, **kwargs)
221

222
    def deploy(self, user=None, task_uuid=None, timeout=15):
223 224 225 226 227
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

228 229 230 231 232 233 234
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

235 236 237 238
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
239 240 241 242
        if self.destroyed:
            self.destroyed = None
            self.save()

243
        if self.ready:
244
            return False
245

246 247 248 249
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
250
            queue_name = self.get_remote_queue_name('storage')
251 252 253 254
            disk_desc = self.get_disk_desc()
            if self.type == 'qcow2-snap':
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
255 256
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
257 258 259
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
260 261
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
262 263 264

            self.ready = True
            self.save()
265

266
            return True
267

268
    def deploy_async(self, user=None):
269 270
        """Execute deploy asynchronously.
        """
271 272
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
273

274 275 276 277 278
    def generate_filename(self):
        """Generate a unique filename and set it on the object.
        """
        self.filename = str(uuid.uuid4())

279
    @classmethod
280
    def create_empty(cls, instance=None, user=None, **kwargs):
281 282
        """Create empty Disk object.

283 284
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
285
        :param user: Creator of the disk.
286
        :type user: django.contrib.auth.User
287 288

        :return: Disk object without a real image, to be .deploy()ed later.
289
        """
290 291
        with disk_activity(code_suffix="create", user=user) as act:
            disk = cls(**kwargs)
292 293
            if disk.filename is None:
                disk.generate_filename()
294 295 296 297 298 299
            disk.save()
            act.disk = disk
            act.save()
            if instance:
                instance.disks.add(disk)
            return disk
300 301

    @classmethod
302
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
303
        """Create disk object and download data from url asynchrnously.
304

305 306
        :param url: URL of image to download.
        :type url: string
307 308
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
309 310 311 312 313 314
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
315 316 317 318
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
        return local_tasks.create_from_url.apply_async(kwargs=kwargs,
                                                       queue='localhost.man')
319

320
    @classmethod
321 322
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
323 324 325 326
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
327 328
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
329 330
        :param user: owner of the disk
        :type user: django.contrib.auth.User
331 332
        :param task_uuid: TODO
        :param abortable_task: TODO
333

334 335
        :return: The created Disk object
        :rtype: Disk
336
        """
337 338
        kwargs.setdefault('name', url.split('/')[-1])
        disk = cls(**kwargs)
339
        disk.generate_filename()
340 341
        disk.type = "iso"
        disk.size = 1
342
        # TODO get proper datastore
343
        disk.datastore = DataStore.objects.get()
344
        disk.save()
345 346
        if instance:
            instance.disks.add(disk)
347 348
        queue_name = disk.get_remote_queue_name('storage')

349 350 351 352 353 354 355 356 357
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
358 359

        with disk_activity(code_suffix='download', disk=disk,
360 361 362 363 364 365 366 367 368 369 370 371 372 373
                           task_uuid=task_uuid, user=user,
                           on_abort=__on_abort):
            result = remote_tasks.download.apply_async(
                kwargs={'url': url, 'parent_id': task_uuid,
                        'disk': disk.get_disk_desc()},
                queue=queue_name)
            while True:
                try:
                    size = result.get(timeout=5)
                    break
                except TimeoutError:
                    if abortable_task and abortable_task.is_aborted():
                        AbortableAsyncResult(result.id).abort()
                        raise AbortException("Download aborted by user.")
374
            disk.size = size
375
            disk.ready = True
376
            disk.save()
377
        return disk
378

379
    def destroy(self, user=None, task_uuid=None):
380 381 382
        if self.destroyed:
            return False

383 384 385 386
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
387

388
            return True
389

390
    def destroy_async(self, user=None):
391 392
        """Execute destroy asynchronously.
        """
393 394
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
395

396
    def restore(self, user=None, task_uuid=None):
397
        """Recover destroyed disk from trash if possible.
398 399 400 401 402 403 404 405
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

406
    def save_as(self, user=None, task_uuid=None, timeout=120):
407 408 409 410 411 412 413 414 415 416 417 418
        mapping = {
            'qcow2-snap': ('qcow2-norm', self.base),
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        if self.is_in_use():
            raise self.DiskInUseError(self)

        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

419
        with disk_activity(code_suffix='save_as', disk=self,
420
                           task_uuid=task_uuid, user=user, timeout=300):
421 422 423 424

            new_type, new_base = mapping[self.type]

            disk = Disk.objects.create(base=new_base, datastore=self.datastore,
425 426
                                       name=self.name, size=self.size,
                                       type=new_type)
427

428
            queue_name = self.get_remote_queue_name('storage')
429 430
            remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                 disk.get_disk_desc()],
431 432
                                           queue=queue_name
                                           ).get(timeout=timeout)
433 434 435 436 437 438 439 440 441 442 443 444 445

            disk.ready = True
            disk.save()

            return disk


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
446
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
447
        act = cls(activity_code='storage.Disk.' + code_suffix,
448
                  disk=disk, parent=None, started=timezone.now(),
449
                  task_uuid=task_uuid, user=user)
450
        act.save()
451
        return act
452

453 454 455
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
456
            disk=self.disk, parent=self, started=timezone.now(),
457 458 459
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
460

461 462 463
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
464
        return activitycontextimpl(act)
465

466

467
@contextmanager
468 469
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
470
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
471
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)