Commit d1b95123 by Guba Sándor Committed by Őry Máté

storage: refactor models

parent 3bcecbcd
...@@ -19,13 +19,12 @@ ...@@ -19,13 +19,12 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from contextlib import contextmanager
import logging import logging
from os.path import join from os.path import join
import uuid import uuid
from celery.signals import worker_ready from celery.contrib.abortable import AbortableAsyncResult
from django.db.models import (Model, CharField, DateTimeField, from django.db.models import (Model, BooleanField, CharField, DateTimeField,
ForeignKey) ForeignKey)
from django.utils import timezone from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
...@@ -33,11 +32,9 @@ from model_utils.models import TimeStampedModel ...@@ -33,11 +32,9 @@ from model_utils.models import TimeStampedModel
from sizefield.models import FileSizeField from sizefield.models import FileSizeField
from acl.models import AclBase from acl.models import AclBase
from .tasks import local_tasks, remote_tasks from .tasks import local_tasks, storage_tasks
from celery.exceptions import TimeoutError from celery.exceptions import TimeoutError
from manager.mancelery import celery from common.models import WorkerNotFound
from common.models import (ActivityModel, activitycontextimpl,
WorkerNotFound)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -64,8 +61,12 @@ class DataStore(Model): ...@@ -64,8 +61,12 @@ class DataStore(Model):
logger.debug("Checking for storage queue %s.%s", logger.debug("Checking for storage queue %s.%s",
self.hostname, queue_id) self.hostname, queue_id)
if not check_worker or local_tasks.check_queue(self.hostname, if not check_worker or local_tasks.check_queue(self.hostname,
queue_id, priority): queue_id,
return self.hostname + '.' + queue_id priority):
queue_name = self.hostname + '.' + queue_id
if priority is not None:
queue_name = queue_name + '.' + priority
return queue_name
else: else:
raise WorkerNotFound() raise WorkerNotFound()
...@@ -99,6 +100,8 @@ class Disk(AclBase, TimeStampedModel): ...@@ -99,6 +100,8 @@ class Disk(AclBase, TimeStampedModel):
verbose_name=_("device number")) verbose_name=_("device number"))
destroyed = DateTimeField(blank=True, default=None, null=True) destroyed = DateTimeField(blank=True, default=None, null=True)
is_ready = BooleanField(default=False)
class Meta: class Meta:
ordering = ['name'] ordering = ['name']
verbose_name = _('disk') verbose_name = _('disk')
...@@ -142,22 +145,6 @@ class Disk(AclBase, TimeStampedModel): ...@@ -142,22 +145,6 @@ class Disk(AclBase, TimeStampedModel):
self.disk = disk self.disk = disk
@property @property
def is_ready(self):
""" Returns True if the disk is physically ready on the storage.
It needs at least 1 successfull deploy action.
"""
return self.activity_log.filter(activity_code__endswith="deploy",
succeeded=True)
@property
def failed(self):
""" Returns True if the last activity on the disk is failed.
"""
result = self.activity_log.all().order_by('-id')[0].succeeded
return not (result is None) and not result
@property
def path(self): def path(self):
"""The path where the files are stored. """The path where the files are stored.
""" """
...@@ -199,24 +186,6 @@ class Disk(AclBase, TimeStampedModel): ...@@ -199,24 +186,6 @@ class Disk(AclBase, TimeStampedModel):
'raw-rw': 'vd', 'raw-rw': 'vd',
}[self.type] }[self.type]
def is_downloading(self):
return self.size is None and not self.failed
def get_download_percentage(self):
if not self.is_downloading():
return None
try:
task = self.activity_log.filter(
activity_code__endswith="deploy",
succeeded__isnull=True)[0].task_uuid
result = celery.AsyncResult(id=task)
return result.info.get("percent")
except:
return 0
def get_latest_activity_result(self):
return self.activity_log.latest("pk").result
@property @property
def is_deletable(self): def is_deletable(self):
"""True if the associated file can be deleted. """True if the associated file can be deleted.
...@@ -334,92 +303,36 @@ class Disk(AclBase, TimeStampedModel): ...@@ -334,92 +303,36 @@ class Disk(AclBase, TimeStampedModel):
if self.is_ready: if self.is_ready:
return True return True
with disk_activity(code_suffix='deploy', disk=self, queue_name = self.get_remote_queue_name('storage', priority="fast")
task_uuid=task_uuid, user=user) as act: disk_desc = self.get_disk_desc()
if self.base is not None:
# Delegate create / snapshot jobs storage_tasks.snapshot.apply_async(args=[disk_desc],
queue_name = self.get_remote_queue_name('storage') queue=queue_name
disk_desc = self.get_disk_desc() ).get(timeout=timeout)
if self.base is not None: else:
with act.sub_activity('creating_snapshot'): storage_tasks.create.apply_async(args=[disk_desc],
remote_tasks.snapshot.apply_async(args=[disk_desc], queue=queue_name
queue=queue_name ).get(timeout=timeout)
).get(timeout=timeout)
else:
with act.sub_activity('creating_disk'):
remote_tasks.create.apply_async(args=[disk_desc],
queue=queue_name
).get(timeout=timeout)
return True return True
def deploy_async(self, user=None):
"""Execute deploy asynchronously.
"""
return local_tasks.deploy.apply_async(args=[self, user],
queue="localhost.man")
@classmethod @classmethod
def create(cls, instance=None, user=None, **params): def create(cls, user=None, **params):
"""Create disk with activity. disk = cls.__create(user, params)
"""
datastore = params.pop('datastore', DataStore.objects.get())
filename = params.pop('filename', str(uuid.uuid4()))
disk = cls(filename=filename, datastore=datastore, **params)
disk.clean() disk.clean()
disk.save() disk.save()
logger.debug("Disk created: %s", params) logger.debug("Disk created: %s", params)
with disk_activity(code_suffix="create",
user=user,
disk=disk):
if instance:
instance.disks.add(disk)
return disk return disk
@classmethod @classmethod
def create_empty_async(cls, instance=None, user=None, **kwargs): def __create(cls, user, params):
"""Execute deploy asynchronously. datastore = params.pop('datastore', DataStore.objects.get())
""" filename = params.pop('filename', str(uuid.uuid4()))
return local_tasks.create_empty.apply_async( disk = cls(filename=filename, datastore=datastore, **params)
args=[cls, instance, user, kwargs], queue="localhost.man")
@classmethod
def create_empty(cls, instance=None, user=None, task_uuid=None, **kwargs):
"""Create empty Disk object.
:param instance: Instance or template attach the Disk to.
:type instance: vm.models.Instance or InstanceTemplate or NoneType
:param user: Creator of the disk.
:type user: django.contrib.auth.User
:return: Disk object without a real image, to be .deploy()ed later.
"""
disk = Disk.create(instance, user, **kwargs)
disk.deploy(user=user, task_uuid=task_uuid)
return disk return disk
@classmethod @classmethod
def create_from_url_async(cls, url, instance=None, user=None, **kwargs): def download(cls, url, task, user=None, **params):
"""Create disk object and download data from url asynchrnously.
:param url: URL of image to download.
:type url: string
:param instance: Instance or template attach the Disk to.
:type instance: vm.models.Instance or InstanceTemplate or NoneType
:param user: owner of the disk
:type user: django.contrib.auth.User
:return: Task
:rtype: AsyncResult
"""
kwargs.update({'cls': cls, 'url': url,
'instance': instance, 'user': user})
return local_tasks.create_from_url.apply_async(
kwargs=kwargs, queue='localhost.man')
@classmethod
def create_from_url(cls, url, instance=None, user=None,
task_uuid=None, abortable_task=None, **kwargs):
"""Create disk object and download data from url synchronusly. """Create disk object and download data from url synchronusly.
:param url: image url to download. :param url: image url to download.
...@@ -434,57 +347,32 @@ class Disk(AclBase, TimeStampedModel): ...@@ -434,57 +347,32 @@ class Disk(AclBase, TimeStampedModel):
:return: The created Disk object :return: The created Disk object
:rtype: Disk :rtype: Disk
""" """
kwargs.setdefault('name', url.split('/')[-1]) params.setdefault('name', url.split('/')[-1])
disk = Disk.create(type="iso", instance=instance, user=user, disk = cls.__create(type="iso", user=user, size=None, **params)
size=None, **kwargs) queue_name = disk.get_remote_queue_name('storage', priority='slow')
queue_name = disk.get_remote_queue_name('storage') remote = storage_tasks.download.apply_async(
kwargs={'url': url, 'parent_id': task.id,
def __on_abort(activity, error): 'disk': disk.get_disk_desc()},
activity.disk.destroyed = timezone.now() queue=queue_name)
activity.disk.save() while True:
try:
if abortable_task: size = remote.get(timeout=5)
from celery.contrib.abortable import AbortableAsyncResult break
except TimeoutError:
class AbortException(Exception): if task is not None and task.is_aborted():
pass AbortableAsyncResult(remote.id).abort()
raise Exception("Download aborted by user.")
with disk_activity(code_suffix='deploy', disk=disk, disk.size = size
task_uuid=task_uuid, user=user, disk.save()
on_abort=__on_abort) as act:
with act.sub_activity('downloading_disk'):
result = remote_tasks.download.apply_async(
kwargs={'url': url, 'parent_id': task_uuid,
'disk': disk.get_disk_desc()},
queue=queue_name)
while True:
try:
size = result.get(timeout=5)
break
except TimeoutError:
if abortable_task and abortable_task.is_aborted():
AbortableAsyncResult(result.id).abort()
raise AbortException("Download aborted by user.")
disk.size = size
disk.save()
return disk return disk
def destroy(self, user=None, task_uuid=None): def destroy(self, user=None, task_uuid=None):
if self.destroyed: if self.destroyed:
return False return False
with disk_activity(code_suffix='destroy', disk=self, self.destroyed = timezone.now()
task_uuid=task_uuid, user=user): self.save()
self.destroyed = timezone.now() return True
self.save()
return True
def destroy_async(self, user=None):
"""Execute destroy asynchronously.
"""
return local_tasks.destroy.apply_async(args=[self, user],
queue='localhost.man')
def restore(self, user=None, task_uuid=None): def restore(self, user=None, task_uuid=None):
"""Recover destroyed disk from trash if possible. """Recover destroyed disk from trash if possible.
...@@ -492,62 +380,6 @@ class Disk(AclBase, TimeStampedModel): ...@@ -492,62 +380,6 @@ class Disk(AclBase, TimeStampedModel):
# TODO # TODO
pass pass
def restore_async(self, user=None):
local_tasks.restore.apply_async(args=[self, user],
queue='localhost.man')
def clone_async(self, new_disk=None, timeout=300, user=None):
"""Clone a Disk to another Disk
:param new_disk: optional, the new Disk object to clone in
:type new_disk: storage.models.Disk
:param user: Creator of the disk.
:type user: django.contrib.auth.User
:return: AsyncResult
"""
return local_tasks.clone.apply_async(args=[self, new_disk,
timeout, user],
queue="localhost.man")
def clone(self, disk=None, user=None, task_uuid=None, timeout=300):
"""Cloning Disk into another Disk.
The Disk.type can'T be snapshot.
:param new_disk: optional, the new Disk object to clone in
:type new_disk: storage.models.Disk
:param user: Creator of the disk.
:type user: django.contrib.auth.User
:return: the cloned Disk object.
"""
banned_types = ['qcow2-snap']
if self.type in banned_types:
raise self.WrongDiskTypeError(self.type)
if self.is_in_use:
raise self.DiskInUseError(self)
if not self.is_ready:
raise self.DiskIsNotReady(self)
if not disk:
base = None
if self.type == "iso":
base = self
disk = Disk.create(datastore=self.datastore,
name=self.name, size=self.size,
type=self.type, base=base)
with disk_activity(code_suffix="clone", disk=self,
user=user, task_uuid=task_uuid):
with disk_activity(code_suffix="deploy", disk=disk,
user=user, task_uuid=task_uuid):
queue_name = self.get_remote_queue_name('storage')
remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
disk.get_disk_desc()],
queue=queue_name
).get() # Timeout
return disk
def save_as(self, user=None, task_uuid=None, timeout=300): def save_as(self, user=None, task_uuid=None, timeout=300):
"""Save VM as template. """Save VM as template.
...@@ -582,65 +414,11 @@ class Disk(AclBase, TimeStampedModel): ...@@ -582,65 +414,11 @@ class Disk(AclBase, TimeStampedModel):
name=self.name, size=self.size, name=self.name, size=self.size,
type=new_type) type=new_type)
with disk_activity(code_suffix="save_as", disk=self, queue_name = self.get_remote_queue_name("storage", priority="slow")
user=user, task_uuid=task_uuid): storage_tasks.merge.apply_async(args=[self.get_disk_desc(),
with disk_activity(code_suffix="deploy", disk=disk, disk.get_disk_desc()],
user=user, task_uuid=task_uuid): queue=queue_name
queue_name = self.get_remote_queue_name('storage') ).get() # Timeout
remote_tasks.merge.apply_async(args=[self.get_disk_desc(), disk.is_ready = True
disk.get_disk_desc()], disk.save()
queue=queue_name return disk
).get() # Timeout
return disk
class DiskActivity(ActivityModel):
disk = ForeignKey(Disk, related_name='activity_log',
help_text=_('Disk this activity works on.'),
verbose_name=_('disk'))
@classmethod
def create(cls, code_suffix, disk, task_uuid=None, user=None):
act = cls(activity_code='storage.Disk.' + code_suffix,
disk=disk, parent=None, started=timezone.now(),
task_uuid=task_uuid, user=user)
act.save()
return act
def __unicode__(self):
if self.parent:
return '{}({})->{}'.format(self.parent.activity_code,
self.disk,
self.activity_code)
else:
return '{}({})'.format(self.activity_code,
self.disk)
def create_sub(self, code_suffix, task_uuid=None):
act = DiskActivity(
activity_code=self.activity_code + '.' + code_suffix,
disk=self.disk, parent=self, started=timezone.now(),
task_uuid=task_uuid, user=self.user)
act.save()
return act
@contextmanager
def sub_activity(self, code_suffix, task_uuid=None):
act = self.create_sub(code_suffix, task_uuid)
return activitycontextimpl(act)
@contextmanager
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
on_abort=None, on_commit=None):
act = DiskActivity.create(code_suffix, disk, task_uuid, user)
return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
@worker_ready.connect()
def cleanup(conf=None, **kwargs):
# TODO check if other manager workers are running
for i in DiskActivity.objects.filter(finished__isnull=True):
i.finish(False, "Manager is restarted, activity is cleaned up. "
"You can try again now.")
logger.error('Forced finishing stale activity %s', i)
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from manager.mancelery import celery
@celery.task(name='storagedriver.list')
def list(dir):
pass
@celery.task(name='storagedriver.list_files')
def list_files(dir):
pass
@celery.task(name='storagedriver.create')
def create(disk_desc):
pass
@celery.task(name='storagedriver.download')
def download(disk_desc, url):
pass
@celery.task(name='storagedriver.delete')
def delete(path):
pass
@celery.task(name='storagedriver.delete_dump')
def delete_dump(path):
pass
@celery.task(name='storagedriver.snapshot')
def snapshot(disk_desc):
pass
@celery.task(name='storagedriver.get')
def get(path):
pass
@celery.task(name='storagedriver.merge')
def merge(src_disk_desc, dst_disk_desc):
pass
@celery.task(name='storagedriver.make_free_space')
def make_free_space(datastore, percent):
pass
@celery.task(name='storagedriver.move_to_trash')
def move_to_trash(datastore, disk_path):
pass
@celery.task(name='storagedriver.get_storage_stat')
def get_storage_stat(path):
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment