models.py 17.2 KB
Newer Older
1 2
# coding=utf-8

3
from contextlib import contextmanager
4
import logging
5
from os.path import join
6 7
import uuid

8
from django.db.models import (Model, CharField, DateTimeField,
9
                              ForeignKey)
10
from django.utils import timezone
11
from django.utils.translation import ugettext_lazy as _
12
from model_utils.models import TimeStampedModel
13
from sizefield.models import FileSizeField
14
from datetime import timedelta
15

16
from acl.models import AclBase
17
from .tasks import local_tasks, remote_tasks
18
from celery.exceptions import TimeoutError
19
from manager.mancelery import celery
20
from common.models import ActivityModel, activitycontextimpl, WorkerNotFound
21 22 23 24

logger = logging.getLogger(__name__)


25
class DataStore(Model):
Guba Sándor committed
26

27 28
    """Collection of virtual disks.
    """
29 30 31 32
    name = CharField(max_length=100, unique=True, verbose_name=_('name'))
    path = CharField(max_length=200, unique=True, verbose_name=_('path'))
    hostname = CharField(max_length=40, unique=True,
                         verbose_name=_('hostname'))
Guba Sándor committed
33

34 35 36 37 38 39 40 41
    class Meta:
        ordering = ['name']
        verbose_name = _('datastore')
        verbose_name_plural = _('datastores')

    def __unicode__(self):
        return u'%s (%s)' % (self.name, self.path)

42
    def get_remote_queue_name(self, queue_id, check_worker=True):
43 44
        logger.debug("Checking for storage queue %s.%s",
                     self.hostname, queue_id)
45 46
        if not check_worker or local_tasks.check_queue(self.hostname,
                                                       queue_id):
47 48 49
            return self.hostname + '.' + queue_id
        else:
            raise WorkerNotFound()
50

51 52 53
    def get_deletable_disks(self):
        return [disk.filename for disk in
                self.disk_set.filter(
54
                    destroyed__isnull=False) if disk.is_deletable]
55

56

57
class Disk(AclBase, TimeStampedModel):
Guba Sándor committed
58

59 60
    """A virtual disk.
    """
61 62 63 64 65
    ACL_LEVELS = (
        ('user', _('user')),          # see all details
        ('operator', _('operator')),
        ('owner', _('owner')),        # superuser, can delete, delegate perms
    )
66 67
    TYPES = [('qcow2-norm', 'qcow2 normal'), ('qcow2-snap', 'qcow2 snapshot'),
             ('iso', 'iso'), ('raw-ro', 'raw read-only'), ('raw-rw', 'raw')]
68
    name = CharField(blank=True, max_length=100, verbose_name=_("name"))
69 70
    filename = CharField(max_length=256, unique=True,
                         verbose_name=_("filename"))
71 72 73
    datastore = ForeignKey(DataStore, verbose_name=_("datastore"),
                           help_text=_("The datastore that holds the disk."))
    type = CharField(max_length=10, choices=TYPES)
74
    size = FileSizeField()
75 76 77
    base = ForeignKey('self', blank=True, null=True,
                      related_name='derivatives')
    dev_num = CharField(default='a', max_length=1,
78
                        verbose_name=_("device number"))
79
    destroyed = DateTimeField(blank=True, default=None, null=True)
80 81 82 83 84 85

    class Meta:
        ordering = ['name']
        verbose_name = _('disk')
        verbose_name_plural = _('disks')

86 87
    class WrongDiskTypeError(Exception):

88 89 90 91 92 93 94 95
        def __init__(self, type, message=None):
            if message is None:
                message = ("Operation can't be invoked on a disk of type '%s'."
                           % type)

            Exception.__init__(self, message)

            self.type = type
96

97 98
    class DiskInUseError(Exception):

99 100 101 102
        def __init__(self, disk, message=None):
            if message is None:
                message = ("The requested operation can't be performed on "
                           "disk '%s (%s)' because it is in use." %
Dudás Ádám committed
103
                           (disk.name, disk.filename))
104 105 106 107

            Exception.__init__(self, message)

            self.disk = disk
108

109
    @property
110 111 112 113
    def ready(self):
        return self.activity_log.filter(activity_code__endswith="deploy",
                                        succeeded__isnull=False)

114
    def path(self):
115 116
        """The path where the files are stored.
        """
117
        return join(self.datastore.path, self.filename)
118 119

    @property
120
    def vm_format(self):
121 122
        """Returns the proper file format for different type of images.
        """
123 124 125
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
126
            'iso': 'raw',
127 128 129 130
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

131
    @property
132
    def format(self):
133 134
        """Returns the proper file format for different types of images.
        """
135 136 137 138 139 140 141 142 143
        return {
            'qcow2-norm': 'qcow2',
            'qcow2-snap': 'qcow2',
            'iso': 'iso',
            'raw-ro': 'raw',
            'raw-rw': 'raw',
        }[self.type]

    @property
144
    def device_type(self):
145 146
        """Returns the proper device prefix for different types of images.
        """
147
        return {
148 149
            'qcow2-norm': 'vd',
            'qcow2-snap': 'vd',
150
            'iso': 'hd',
151 152 153
            'raw-ro': 'vd',
            'raw-rw': 'vd',
        }[self.type]
154

155 156 157 158 159 160 161 162 163 164 165 166 167
    def is_downloading(self):
        da = DiskActivity.objects.filter(disk=self).latest("created")
        return (da.activity_code == "storage.Disk.download"
                and da.succeeded is None)

    def get_download_percentage(self):
        if not self.is_downloading():
            return None

        task = DiskActivity.objects.latest("created").task_uuid
        result = celery.AsyncResult(id=task)
        return result.info.get("percent")

168
    @property
169
    def is_deletable(self):
170
        """True if the associated file can be deleted.
171
        """
172
        # Check if all children and the disk itself is destroyed.
173 174
        yesterday = timezone.now() - timedelta(days=1)
        return (self.destroyed is not None
175
                and self.destroyed < yesterday) and self.children_deletable
176

177 178 179
    @property
    def children_deletable(self):
        """True if all children of the disk are deletable.
180
        """
181
        return all(i.is_deletable for i in self.derivatives.all())
182

183
    @property
184
    def is_in_use(self):
185
        """True if disk is attached to an active VM.
186 187 188 189

        'In use' means the disk is attached to a VM which is not STOPPED, as
        any other VMs leave the disk in an inconsistent state.
        """
190
        return any(i.state != 'STOPPED' for i in self.instance_set.all())
191

192 193
    def get_exclusive(self):
        """Get an instance of the disk for exclusive usage.
194

195 196 197
        This method manipulates the database only.
        """
        type_mapping = {
198 199 200
            'qcow2-norm': 'qcow2-snap',
            'iso': 'iso',
            'raw-ro': 'raw-rw',
201 202 203 204 205 206
        }

        if self.type not in type_mapping.keys():
            raise self.WrongDiskTypeError(self.type)

        new_type = type_mapping[self.type]
207

208 209 210
        return Disk.create(base=self, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
211 212

    def get_vmdisk_desc(self):
213 214
        """Serialize disk object to the vmdriver.
        """
215
        return {
216
            'source': self.path,
217
            'driver_type': self.vm_format,
218
            'driver_cache': 'none',
219
            'target_device': self.device_type + self.dev_num,
220
            'disk_device': 'cdrom' if self.type == 'iso' else 'disk'
221 222
        }

223
    def get_disk_desc(self):
224 225
        """Serialize disk object to the storage driver.
        """
226 227 228 229 230 231
        return {
            'name': self.filename,
            'dir': self.datastore.path,
            'format': self.format,
            'size': self.size,
            'base_name': self.base.filename if self.base else None,
232
            'type': 'snapshot' if self.base else 'normal'
233 234
        }

235
    def get_remote_queue_name(self, queue_id='storage', check_worker=True):
236 237
        """Returns the proper queue name based on the datastore.
        """
238
        if self.datastore:
239
            return self.datastore.get_remote_queue_name(queue_id, check_worker)
240 241 242
        else:
            return None

243
    def __unicode__(self):
244
        return u"%s (#%d)" % (self.name, self.id or 0)
245

246 247 248 249 250
    def clean(self, *args, **kwargs):
        if self.size == "" and self.base:
            self.size = self.base.size
        super(Disk, self).clean(*args, **kwargs)

251
    def deploy(self, user=None, task_uuid=None, timeout=15):
252 253 254 255 256
        """Reify the disk model on the associated data store.

        :param self: the disk model to reify
        :type self: storage.models.Disk

257 258 259 260 261 262 263
        :param user: The user who's issuing the command.
        :type user: django.contrib.auth.models.User

        :param task_uuid: The task's UUID, if the command is being executed
                          asynchronously.
        :type task_uuid: str

264 265 266 267
        :return: True if a new reification of the disk has been created;
                 otherwise, False.
        :rtype: bool
        """
268 269 270 271
        if self.destroyed:
            self.destroyed = None
            self.save()

272
        if self.ready:
273
            return False
274

275 276 277 278
        with disk_activity(code_suffix='deploy', disk=self,
                           task_uuid=task_uuid, user=user) as act:

            # Delegate create / snapshot jobs
279
            queue_name = self.get_remote_queue_name('storage')
280
            disk_desc = self.get_disk_desc()
281
            if self.base is not None:
282 283
                with act.sub_activity('creating_snapshot'):
                    remote_tasks.snapshot.apply_async(args=[disk_desc],
284 285
                                                      queue=queue_name
                                                      ).get(timeout=timeout)
286 287 288
            else:
                with act.sub_activity('creating_disk'):
                    remote_tasks.create.apply_async(args=[disk_desc],
289 290
                                                    queue=queue_name
                                                    ).get(timeout=timeout)
291 292 293

            self.ready = True
            self.save()
294

295
            return True
296

297
    def deploy_async(self, user=None):
298 299
        """Execute deploy asynchronously.
        """
300 301
        return local_tasks.deploy.apply_async(args=[self, user],
                                              queue="localhost.man")
302

303 304
    @classmethod
    def create(cls, **params):
305 306
        datastore = params.pop('datastore', DataStore.objects.get())
        disk = cls(filename=str(uuid.uuid4()), datastore=datastore, **params)
307 308
        disk.save()
        return disk
309

310
    @classmethod
311
    def create_empty(cls, instance=None, user=None, **kwargs):
312 313
        """Create empty Disk object.

314 315
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
316
        :param user: Creator of the disk.
317
        :type user: django.contrib.auth.User
318 319

        :return: Disk object without a real image, to be .deploy()ed later.
320
        """
321 322
        disk = cls.create(**kwargs)
        with disk_activity(code_suffix="create", user=user, disk=disk):
323 324 325
            if instance:
                instance.disks.add(disk)
            return disk
326 327

    @classmethod
328
    def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
329
        """Create disk object and download data from url asynchrnously.
330

331 332
        :param url: URL of image to download.
        :type url: string
333 334
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
335 336 337 338 339 340
        :param user: owner of the disk
        :type user: django.contrib.auth.User

        :return: Task
        :rtype: AsyncResult
        """
341 342
        kwargs.update({'cls': cls, 'url': url,
                       'instance': instance, 'user': user})
cloud committed
343 344
        return local_tasks.create_from_url.apply_async(
            kwargs=kwargs, queue='localhost.man')
345

346
    @classmethod
347 348
    def create_from_url(cls, url, instance=None, user=None,
                        task_uuid=None, abortable_task=None, **kwargs):
349 350 351 352
        """Create disk object and download data from url synchronusly.

        :param url: image url to download.
        :type url: url
353 354
        :param instance: Instance or template attach the Disk to.
        :type instance: vm.models.Instance or InstanceTemplate or NoneType
355 356
        :param user: owner of the disk
        :type user: django.contrib.auth.User
357 358
        :param task_uuid: TODO
        :param abortable_task: TODO
359

360 361
        :return: The created Disk object
        :rtype: Disk
362
        """
363
        kwargs.setdefault('name', url.split('/')[-1])
364
        disk = Disk.create(type="iso", size=1, **kwargs)
365
        # TODO get proper datastore
366
        disk.datastore = DataStore.objects.get()
367 368
        if instance:
            instance.disks.add(disk)
369 370
        queue_name = disk.get_remote_queue_name('storage')

371 372 373 374 375 376 377 378 379
        def __on_abort(activity, error):
            activity.disk.destroyed = timezone.now()
            activity.disk.save()

        if abortable_task:
            from celery.contrib.abortable import AbortableAsyncResult

            class AbortException(Exception):
                pass
380 381

        with disk_activity(code_suffix='download', disk=disk,
382 383 384 385 386 387 388 389 390 391 392 393 394 395
                           task_uuid=task_uuid, user=user,
                           on_abort=__on_abort):
            result = remote_tasks.download.apply_async(
                kwargs={'url': url, 'parent_id': task_uuid,
                        'disk': disk.get_disk_desc()},
                queue=queue_name)
            while True:
                try:
                    size = result.get(timeout=5)
                    break
                except TimeoutError:
                    if abortable_task and abortable_task.is_aborted():
                        AbortableAsyncResult(result.id).abort()
                        raise AbortException("Download aborted by user.")
396
            disk.size = size
397
            disk.ready = True
398
            disk.save()
399
        return disk
400

401
    def destroy(self, user=None, task_uuid=None):
402 403 404
        if self.destroyed:
            return False

405 406 407 408
        with disk_activity(code_suffix='destroy', disk=self,
                           task_uuid=task_uuid, user=user):
            self.destroyed = timezone.now()
            self.save()
409

410
            return True
411

412
    def destroy_async(self, user=None):
413 414
        """Execute destroy asynchronously.
        """
415 416
        return local_tasks.destroy.apply_async(args=[self, user],
                                               queue='localhost.man')
417

418
    def restore(self, user=None, task_uuid=None):
419
        """Recover destroyed disk from trash if possible.
420 421 422 423 424 425 426 427
        """
        # TODO
        pass

    def restore_async(self, user=None):
        local_tasks.restore.apply_async(args=[self, user],
                                        queue='localhost.man')

428 429 430 431 432
    def save_as_async(self, disk, task_uuid=None, timeout=300, user=None):
        return local_tasks.save_as.apply_async(args=[disk, timeout, user],
                                               queue="localhost.man")

    def save_as(self, user=None, task_uuid=None, timeout=300):
433 434 435 436 437
        """Save VM as template.

        VM must be in STOPPED state to perform this action.
        The timeout parameter is not used now.
        """
438 439 440 441 442 443
        mapping = {
            'qcow2-snap': ('qcow2-norm', self.base),
        }
        if self.type not in mapping.keys():
            raise self.WrongDiskTypeError(self.type)

444
        if self.is_in_use:
445 446 447 448 449
            raise self.DiskInUseError(self)

        # from this point on, the caller has to guarantee that the disk is not
        # going to be used until the operation is complete

450
        new_type, new_base = mapping[self.type]
451

452 453 454
        disk = Disk.create(base=new_base, datastore=self.datastore,
                           name=self.name, size=self.size,
                           type=new_type)
455

456 457 458
        disk.save()
        with disk_activity(code_suffix="save_as", disk=self,
                           user=user, task_uuid=None):
459
            queue_name = self.get_remote_queue_name('storage')
460 461
            remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
                                                 disk.get_disk_desc()],
462
                                           queue=queue_name
463
                                           ).get()  # Timeout
464 465 466
            disk.ready = True
            disk.save()

467
        return disk
468 469 470 471 472 473 474 475


class DiskActivity(ActivityModel):
    disk = ForeignKey(Disk, related_name='activity_log',
                      help_text=_('Disk this activity works on.'),
                      verbose_name=_('disk'))

    @classmethod
476
    def create(cls, code_suffix, disk, task_uuid=None, user=None):
477
        act = cls(activity_code='storage.Disk.' + code_suffix,
478
                  disk=disk, parent=None, started=timezone.now(),
479
                  task_uuid=task_uuid, user=user)
480
        act.save()
481
        return act
482

483 484 485
    def create_sub(self, code_suffix, task_uuid=None):
        act = DiskActivity(
            activity_code=self.activity_code + '.' + code_suffix,
486
            disk=self.disk, parent=self, started=timezone.now(),
487 488 489
            task_uuid=task_uuid, user=self.user)
        act.save()
        return act
490

491 492 493
    @contextmanager
    def sub_activity(self, code_suffix, task_uuid=None):
        act = self.create_sub(code_suffix, task_uuid)
494
        return activitycontextimpl(act)
495

496

497
@contextmanager
498 499
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
                  on_abort=None, on_commit=None):
500
    act = DiskActivity.create(code_suffix, disk, task_uuid, user)
501
    return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)