Commit 86d76cc1 by Bach Dániel

Merge branch 'feature-abort-download-rebased' into 'master'

Feature Abort Download

This merge request is a complete rework of the storage subsystem
parents 23ed986c 3ff7bd9d
......@@ -33,6 +33,7 @@ class Operation(object):
required_perms = ()
do_not_call_in_templates = True
abortable = False
has_percentage = False
def __call__(self, **kwargs):
return self.call(**kwargs)
......
......@@ -38,24 +38,8 @@
"datastore": 1,
"dev_num": "a",
"type": "qcow2-norm",
"size": 8589934592
}
},
{
"pk": 1,
"model": "storage.diskactivity",
"fields":{
"activity_code": "storage.Disk.create",
"succeeded": true,
"parent": null,
"created": "2014-03-18T15:44:37.671Z",
"started": "2014-03-18T15:44:37.671Z",
"finished": "2014-03-18T15:44:37.677Z",
"modified": "2014-03-18T15:44:37.679Z",
"task_uuid": null,
"user": 1,
"disk": 1,
"result":null
"size": 8589934592,
"is_ready": true
}
},
{
......
......@@ -24,6 +24,7 @@ from django.contrib.auth.forms import (
PasswordChangeForm,
)
from django.contrib.auth.models import User, Group
from django.core.validators import URLValidator
from crispy_forms.helper import FormHelper
from crispy_forms.layout import (
......@@ -40,9 +41,9 @@ from django.utils.translation import ugettext as _
from sizefield.widgets import FileSizeWidget
from firewall.models import Vlan, Host
from storage.models import Disk, DataStore
from storage.models import Disk
from vm.models import (
InstanceTemplate, Lease, InterfaceTemplate, Node, Trait, Instance
InstanceTemplate, Lease, InterfaceTemplate, Node, Trait
)
from .models import Profile, GroupProfile
......@@ -846,20 +847,12 @@ class LeaseForm(forms.ModelForm):
model = Lease
class DiskAddForm(forms.Form):
name = forms.CharField()
size = forms.CharField(widget=FileSizeWidget, required=False)
url = forms.CharField(required=False)
is_template = forms.CharField()
object_pk = forms.CharField()
def __init__(self, *args, **kwargs):
self.is_template = kwargs.pop("is_template")
self.object_pk = kwargs.pop("object_pk")
self.user = kwargs.pop("user")
super(DiskAddForm, self).__init__(*args, **kwargs)
self.initial['is_template'] = 1 if self.is_template else 0
self.initial['object_pk'] = self.object_pk
class VmCreateDiskForm(forms.Form):
name = forms.CharField(max_length=100, label=_("Name"))
size = forms.CharField(
widget=FileSizeWidget, initial=(10 << 30), label=_('Size'),
help_text=_('Size of disk to create in bytes or with units '
'like MB or GB.'))
def clean_size(self):
size_in_bytes = self.cleaned_data.get("size")
......@@ -868,66 +861,23 @@ class DiskAddForm(forms.Form):
" GB or MB!"))
return size_in_bytes
def clean(self):
cleaned_data = self.cleaned_data
size = cleaned_data.get("size")
url = cleaned_data.get("url")
if not size and not url:
msg = _("You have to either specify size or URL")
self._errors[_("Global")] = self.error_class([msg])
return cleaned_data
def save(self, commit=True):
data = self.cleaned_data
@property
def helper(self):
helper = FormHelper(self)
helper.form_tag = False
return helper
if self.is_template:
inst = InstanceTemplate.objects.get(pk=self.object_pk)
else:
inst = Instance.objects.get(pk=self.object_pk)
if data['size']:
kwargs = {
'name': data['name'],
'type': "qcow2-norm",
'datastore': DataStore.objects.all()[0],
'size': data['size'],
}
d = Disk.create_empty(instance=inst, user=self.user, **kwargs)
else:
kwargs = {
'name': data['name'],
'url': data['url'],
}
Disk.create_from_url_async(instance=inst, user=self.user,
**kwargs)
d = None
return d
class VmDownloadDiskForm(forms.Form):
name = forms.CharField(max_length=100, label=_("Name"))
url = forms.CharField(label=_('URL'), validators=[URLValidator(), ])
@property
def helper(self):
helper = FormHelper()
helper.form_show_labels = False
helper.layout = Layout(
Field("is_template", type="hidden"),
Field("object_pk", type="hidden"),
Field("name", placeholder=_("Name")),
Field("size", placeholder=_("Disk size (for example: 20GB, "
"1500MB)")),
Field("url", placeholder=_("URL to an ISO image")),
AnyTag(
"div",
HTML(
_("Either specify the size for an empty disk or a URL "
"to an ISO image!")
),
css_class="alert alert-info",
style="padding: 5px; text-align: justify;",
),
)
helper.add_input(Submit("submit", _("Add"),
helper = FormHelper(self)
helper.add_input(Submit("submit", _("Create"),
css_class="btn btn-success"))
helper.form_tag = False
return helper
......
......@@ -3,7 +3,7 @@
$(function() {
/* vm operations */
$('#ops').on('click', '.operation.btn', function(e) {
$('#ops, #vm-details-resources-disk').on('click', '.operation.btn', function(e) {
var icon = $(this).children("i").addClass('icon-spinner icon-spin');
$.ajax({
......
......@@ -105,17 +105,6 @@
</ul>
</div>
</div>
<div class="panel panel-default">
<div class="panel-heading">
<h4 class="no-margin"><i class="icon-folder-open"></i> {% trans "Create new disk" %}</h4>
</div>
<div class="panel-body">
<form action="{% url "dashboard.views.disk-add" %}" method="POST">
{% crispy disk_add_form %}
</form>
</div>
</div>
</div><!-- .col-md-4 -->
</div><!-- .row -->
......
......@@ -13,6 +13,9 @@
{% include "dashboard/_display-name.html" with user=a.user show_org=True %}
</a>
{% endif %}
{% if a.has_percent %}
{{ a.percentage }}%
{% endif %}
{% if a.is_abortable_for_user %}
<form action="{{ a.instance.get_absolute_url }}" method="POST" class="pull-right">
{% csrf_token %}
......
{% load i18n %}
{% for op in ops %}
{% if op.show_in_toolbar %}
<a href="{{op.get_url}}" class="operation operation-{{op.op}} btn btn-default btn-xs"
title="{{op.name}}: {{op.description}}">
<i class="icon-{{op.icon}}"></i>
<span class="sr-only">{{op.name}}</span>
</a>
{% endif %}
{% endfor %}
......@@ -47,9 +47,18 @@
<h3>
{% trans "Disks" %}
<div class="pull-right">
<a href="#" id="vm-details-disk-add" class="btn btn-success btn-xs">
<i class="icon-plus"></i> {% trans "Add new disk" %}
</a>
{% if op.download_disk %}
<a href="{{op.download_disk.get_url}}" class="btn btn-success btn-xs
operation operation-{{op.download_disk.op}} btn btn-default">
<i class="icon-{{op.download_disk.icon}}"></i>
{{op.download_disk.name}} </a>
{% endif %}
{% if op.create_disk %}
<a href="{{op.create_disk.get_url}}" class="btn btn-success btn-xs
operation operation-{{op.create_disk.op}} btn btn-default">
<i class="icon-{{op.create_disk.icon}}"></i>
{{op.create_disk.name}} </a>
{% endif %}
</div>
</h3>
......@@ -68,17 +77,6 @@
</div>
</div>
<div class="js-hidden row" id="vm-details-disk-add-form">
<div class="col-md-12">
<div>
<hr />
<form method="POST" action="{% url "dashboard.views.disk-add" %}" style="max-width: 350px;">
{% crispy forms.disk_add_form %}
</form>
<hr />
</div>
</div>
</div>
{% block extra_js %}
<style>
......
......@@ -17,7 +17,7 @@
import json
from unittest import skip
#from unittest import skip
from django.test import TestCase
from django.test.client import Client
from django.contrib.auth.models import User, Group
......@@ -333,38 +333,6 @@ class VmDetailTest(LoginMixin, TestCase):
self.assertEqual(response.status_code, 302)
self.assertEqual(leases, Lease.objects.count())
def test_unpermitted_vm_disk_add(self):
c = Client()
self.login(c, "user2")
inst = Instance.objects.get(pk=1)
inst.set_level(self.u1, 'owner')
disks = inst.disks.count()
response = c.post("/dashboard/disk/add/", {
'disk-name': "a",
'disk-size': 1,
'disk-is_template': 0,
'disk-object_pk': 1,
})
self.assertEqual(response.status_code, 403)
self.assertEqual(disks, inst.disks.count())
@skip("until fix merged")
def test_permitted_vm_disk_add(self):
c = Client()
self.login(c, "user1")
inst = Instance.objects.get(pk=1)
inst.set_level(self.u1, 'owner')
# disks = inst.disks.count()
response = c.post("/dashboard/disk/add/", {
'disk-name': "a",
'disk-size': 1,
'disk-is_template': 0,
'disk-object_pk': 1,
})
self.assertEqual(response.status_code, 302)
# mancelery is needed TODO
# self.assertEqual(disks + 1, inst.disks.count())
def test_notification_read(self):
c = Client()
self.login(c, "user1")
......
......@@ -20,7 +20,7 @@ from django.conf.urls import patterns, url, include
from vm.models import Instance
from .views import (
AclUpdateView, DiskAddView, FavouriteView, GroupAclUpdateView, GroupDelete,
AclUpdateView, FavouriteView, GroupAclUpdateView, GroupDelete,
GroupDetailView, GroupList, IndexView,
InstanceActivityDetail, LeaseCreate, LeaseDelete, LeaseDetail,
MyPreferencesView, NodeAddTraitView, NodeCreate, NodeDelete,
......@@ -128,8 +128,6 @@ urlpatterns = patterns(
url(r'^notifications/$', NotificationView.as_view(),
name="dashboard.views.notifications"),
url(r'^disk/add/$', DiskAddView.as_view(),
name="dashboard.views.disk-add"),
url(r'^disk/(?P<pk>\d+)/remove/$', DiskRemoveView.as_view(),
name="dashboard.views.disk-remove"),
url(r'^disk/(?P<pk>\d+)/status/$', get_disk_download_status,
......
......@@ -45,7 +45,6 @@ from django.views.generic import (TemplateView, DetailView, View, DeleteView,
from django.contrib import messages
from django.utils.translation import ugettext as _
from django.utils.translation import ungettext as __
from django.template.defaultfilters import title as title_filter
from django.template.loader import render_to_string
from django.template import RequestContext
from django.templatetags.static import static
......@@ -57,10 +56,11 @@ from braces.views import (LoginRequiredMixin, SuperuserRequiredMixin,
from braces.views._access import AccessMixin
from .forms import (
CircleAuthenticationForm, DiskAddForm, HostForm, LeaseForm, MyProfileForm,
CircleAuthenticationForm, HostForm, LeaseForm, MyProfileForm,
NodeForm, TemplateForm, TraitForm, VmCustomizeForm, GroupCreateForm,
UserCreationForm, GroupProfileUpdateForm, UnsubscribeForm,
CirclePasswordChangeForm, VmSaveForm,
VmSaveForm,
CirclePasswordChangeForm, VmCreateDiskForm, VmDownloadDiskForm,
)
from .tables import (
......@@ -277,12 +277,6 @@ class VmDetailView(CheckedDetailView):
instance=self.get_object()).values_list("vlan", flat=True)
).all()
context['acl'] = get_vm_acl_data(instance)
context['forms'] = {
'disk_add_form': DiskAddForm(
user=self.request.user,
is_template=False, object_pk=self.get_object().pk,
prefix="disk"),
}
context['os_type_icon'] = instance.os_type.replace("unknown",
"question")
# ipv6 infos
......@@ -602,6 +596,22 @@ class FormOperationMixin(object):
return self.get(request)
class VmCreateDiskView(FormOperationMixin, VmOperationView):
op = 'create_disk'
form_class = VmCreateDiskForm
show_in_toolbar = False
icon = 'hdd'
class VmDownloadDiskView(FormOperationMixin, VmOperationView):
op = 'download_disk'
form_class = VmDownloadDiskForm
show_in_toolbar = False
icon = 'download'
class VmMigrateView(VmOperationView):
op = 'migrate'
......@@ -641,6 +651,8 @@ vm_ops = {
'destroy': VmOperationView.factory(op='destroy', icon='remove'),
'sleep': VmOperationView.factory(op='sleep', icon='moon'),
'wake_up': VmOperationView.factory(op='wake_up', icon='sun'),
'create_disk': VmCreateDiskView,
'download_disk': VmDownloadDiskView,
}
......@@ -1066,12 +1078,6 @@ class TemplateDetail(LoginRequiredMixin, SuccessMessageMixin, UpdateView):
context = super(TemplateDetail, self).get_context_data(**kwargs)
context['acl'] = get_vm_acl_data(obj)
context['disks'] = obj.disks.all()
context['disk_add_form'] = DiskAddForm(
user=self.request.user,
is_template=True,
object_pk=obj.pk,
prefix="disk",
)
return context
def get_success_url(self):
......@@ -2525,47 +2531,6 @@ def circle_login(request):
return response
class DiskAddView(TemplateView):
def post(self, *args, **kwargs):
is_template = self.request.POST.get("disk-is_template")
object_pk = self.request.POST.get("disk-object_pk")
is_template = int(is_template) == 1
if is_template:
obj = InstanceTemplate.objects.get(pk=object_pk)
else:
obj = Instance.objects.get(pk=object_pk)
if not obj.has_level(self.request.user, 'owner'):
raise PermissionDenied()
form = DiskAddForm(
self.request.POST,
user=self.request.user,
is_template=is_template, object_pk=object_pk,
prefix="disk"
)
if form.is_valid():
if form.cleaned_data.get("size"):
messages.success(self.request, _("Disk successfully added."))
else:
messages.success(self.request, _("Disk download started."))
form.save()
else:
error = "<br /> ".join(["<strong>%s</strong>: %s" %
(title_filter(i[0]), i[1][0])
for i in form.errors.items()])
messages.error(self.request, error)
if is_template:
r = obj.get_absolute_url()
else:
r = obj.get_absolute_url()
r = "%s#resources" % r
return redirect(r)
class MyPreferencesView(UpdateView):
model = Profile
......
......@@ -18,7 +18,7 @@
from django import contrib
# from django.utils.translation import ugettext_lazy as _
from .models import Disk, DataStore, DiskActivity
from .models import Disk, DataStore
class DiskAdmin(contrib.admin.ModelAdmin):
......@@ -31,5 +31,4 @@ class DataStoreAdmin(contrib.admin.ModelAdmin):
contrib.admin.site.register(Disk, DiskAdmin)
contrib.admin.site.register(DiskActivity)
contrib.admin.site.register(DataStore, DataStoreAdmin)
# -*- coding: utf-8 -*-
from south.utils import datetime_utils as datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Deleting model 'DiskActivity'
db.delete_table(u'storage_diskactivity')
# Adding field 'Disk.is_ready'
db.add_column(u'storage_disk', 'is_ready',
self.gf('django.db.models.fields.BooleanField')(default=False),
keep_default=False)
# Changing field 'Disk.size'
db.alter_column(u'storage_disk', 'size', self.gf('sizefield.models.FileSizeField')(null=True))
def backwards(self, orm):
# Adding model 'DiskActivity'
db.create_table(u'storage_diskactivity', (
('task_uuid', self.gf('django.db.models.fields.CharField')(unique=True, max_length=50, null=True, blank=True)),
('parent', self.gf('django.db.models.fields.related.ForeignKey')(related_name='children', null=True, to=orm['storage.DiskActivity'], blank=True)),
('started', self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True)),
('finished', self.gf('django.db.models.fields.DateTimeField')(null=True, blank=True)),
('result', self.gf('django.db.models.fields.TextField')(null=True, blank=True)),
('disk', self.gf('django.db.models.fields.related.ForeignKey')(related_name='activity_log', to=orm['storage.Disk'])),
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('activity_code', self.gf('django.db.models.fields.CharField')(max_length=100)),
('succeeded', self.gf('django.db.models.fields.NullBooleanField')(null=True, blank=True)),
('created', self.gf('model_utils.fields.AutoCreatedField')(default=datetime.datetime.now)),
('modified', self.gf('model_utils.fields.AutoLastModifiedField')(default=datetime.datetime.now)),
('user', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['auth.User'], null=True, blank=True)),
))
db.send_create_signal(u'storage', ['DiskActivity'])
# Deleting field 'Disk.is_ready'
db.delete_column(u'storage_disk', 'is_ready')
# Changing field 'Disk.size'
db.alter_column(u'storage_disk', 'size', self.gf('sizefield.models.FileSizeField')(default=None))
models = {
u'storage.datastore': {
'Meta': {'ordering': "[u'name']", 'object_name': 'DataStore'},
'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '40'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '100'}),
'path': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '200'})
},
u'storage.disk': {
'Meta': {'ordering': "[u'name']", 'object_name': 'Disk'},
'base': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "u'derivatives'", 'null': 'True', 'to': u"orm['storage.Disk']"}),
'created': ('model_utils.fields.AutoCreatedField', [], {'default': 'datetime.datetime.now'}),
'datastore': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['storage.DataStore']"}),
'destroyed': ('django.db.models.fields.DateTimeField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
'dev_num': ('django.db.models.fields.CharField', [], {'default': "u'a'", 'max_length': '1'}),
'filename': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '256'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'is_ready': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'modified': ('model_utils.fields.AutoLastModifiedField', [], {'default': 'datetime.datetime.now'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '100', 'blank': 'True'}),
'size': ('sizefield.models.FileSizeField', [], {'default': 'None', 'null': 'True'}),
'type': ('django.db.models.fields.CharField', [], {'max_length': '10'})
}
}
complete_apps = ['storage']
\ No newline at end of file
......@@ -19,13 +19,12 @@
from __future__ import unicode_literals
from contextlib import contextmanager
import logging
from os.path import join
import uuid
from celery.signals import worker_ready
from django.db.models import (Model, CharField, DateTimeField,
from celery.contrib.abortable import AbortableAsyncResult
from django.db.models import (Model, BooleanField, CharField, DateTimeField,
ForeignKey)
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
......@@ -33,11 +32,9 @@ from model_utils.models import TimeStampedModel
from sizefield.models import FileSizeField
from acl.models import AclBase
from .tasks import local_tasks, remote_tasks
from .tasks import local_tasks, storage_tasks
from celery.exceptions import TimeoutError
from manager.mancelery import celery
from common.models import (ActivityModel, activitycontextimpl,
WorkerNotFound)
from common.models import WorkerNotFound
logger = logging.getLogger(__name__)
......@@ -64,8 +61,12 @@ class DataStore(Model):
logger.debug("Checking for storage queue %s.%s",
self.hostname, queue_id)
if not check_worker or local_tasks.check_queue(self.hostname,
queue_id, priority):
return self.hostname + '.' + queue_id
queue_id,
priority):
queue_name = self.hostname + '.' + queue_id
if priority is not None:
queue_name = queue_name + '.' + priority
return queue_name
else:
raise WorkerNotFound()
......@@ -99,6 +100,8 @@ class Disk(AclBase, TimeStampedModel):
verbose_name=_("device number"))
destroyed = DateTimeField(blank=True, default=None, null=True)
is_ready = BooleanField(default=False)
class Meta:
ordering = ['name']
verbose_name = _('disk')
......@@ -142,22 +145,6 @@ class Disk(AclBase, TimeStampedModel):
self.disk = disk
@property
def is_ready(self):
""" Returns True if the disk is physically ready on the storage.
It needs at least 1 successfull deploy action.
"""
return self.activity_log.filter(activity_code__endswith="deploy",
succeeded=True)
@property
def failed(self):
""" Returns True if the last activity on the disk is failed.
"""
result = self.activity_log.all().order_by('-id')[0].succeeded
return not (result is None) and not result
@property
def path(self):
"""The path where the files are stored.
"""
......@@ -199,24 +186,6 @@ class Disk(AclBase, TimeStampedModel):
'raw-rw': 'vd',
}[self.type]
def is_downloading(self):
return self.size is None and not self.failed
def get_download_percentage(self):
if not self.is_downloading():
return None
try:
task = self.activity_log.filter(
activity_code__endswith="deploy",
succeeded__isnull=True)[0].task_uuid
result = celery.AsyncResult(id=task)
return result.info.get("percent")
except:
return 0
def get_latest_activity_result(self):
return self.activity_log.latest("pk").result
@property
def is_deletable(self):
"""True if the associated file can be deleted.
......@@ -334,92 +303,38 @@ class Disk(AclBase, TimeStampedModel):
if self.is_ready:
return True
with disk_activity(code_suffix='deploy', disk=self,
task_uuid=task_uuid, user=user) as act:
# Delegate create / snapshot jobs
queue_name = self.get_remote_queue_name('storage')
queue_name = self.get_remote_queue_name('storage', priority="fast")
disk_desc = self.get_disk_desc()
if self.base is not None:
with act.sub_activity('creating_snapshot'):
remote_tasks.snapshot.apply_async(args=[disk_desc],
storage_tasks.snapshot.apply_async(args=[disk_desc],
queue=queue_name
).get(timeout=timeout)
else:
with act.sub_activity('creating_disk'):
remote_tasks.create.apply_async(args=[disk_desc],
storage_tasks.create.apply_async(args=[disk_desc],
queue=queue_name
).get(timeout=timeout)
self.is_ready = True
self.save()
return True
def deploy_async(self, user=None):
"""Execute deploy asynchronously.
"""
return local_tasks.deploy.apply_async(args=[self, user],
queue="localhost.man")
@classmethod
def create(cls, instance=None, user=None, **params):
"""Create disk with activity.
"""
datastore = params.pop('datastore', DataStore.objects.get())
filename = params.pop('filename', str(uuid.uuid4()))
disk = cls(filename=filename, datastore=datastore, **params)
def create(cls, user=None, **params):
disk = cls.__create(user, params)
disk.clean()
disk.save()
logger.debug("Disk created: %s", params)
with disk_activity(code_suffix="create",
user=user,
disk=disk):
if instance:
instance.disks.add(disk)
return disk
@classmethod
def create_empty_async(cls, instance=None, user=None, **kwargs):
"""Execute deploy asynchronously.
"""
return local_tasks.create_empty.apply_async(
args=[cls, instance, user, kwargs], queue="localhost.man")
@classmethod
def create_empty(cls, instance=None, user=None, task_uuid=None, **kwargs):
"""Create empty Disk object.
:param instance: Instance or template attach the Disk to.
:type instance: vm.models.Instance or InstanceTemplate or NoneType
:param user: Creator of the disk.
:type user: django.contrib.auth.User
:return: Disk object without a real image, to be .deploy()ed later.
"""
disk = Disk.create(instance, user, **kwargs)
disk.deploy(user=user, task_uuid=task_uuid)
def __create(cls, user, params):
datastore = params.pop('datastore', DataStore.objects.get())
filename = params.pop('filename', str(uuid.uuid4()))
disk = cls(filename=filename, datastore=datastore, **params)
return disk
@classmethod
def create_from_url_async(cls, url, instance=None, user=None, **kwargs):
"""Create disk object and download data from url asynchrnously.
:param url: URL of image to download.
:type url: string
:param instance: Instance or template attach the Disk to.
:type instance: vm.models.Instance or InstanceTemplate or NoneType
:param user: owner of the disk
:type user: django.contrib.auth.User
:return: Task
:rtype: AsyncResult
"""
kwargs.update({'cls': cls, 'url': url,
'instance': instance, 'user': user})
return local_tasks.create_from_url.apply_async(
kwargs=kwargs, queue='localhost.man')
@classmethod
def create_from_url(cls, url, instance=None, user=None,
task_uuid=None, abortable_task=None, **kwargs):
def download(cls, url, task, user=None, **params):
"""Create disk object and download data from url synchronusly.
:param url: image url to download.
......@@ -434,38 +349,25 @@ class Disk(AclBase, TimeStampedModel):
:return: The created Disk object
:rtype: Disk
"""
kwargs.setdefault('name', url.split('/')[-1])
disk = Disk.create(type="iso", instance=instance, user=user,
size=None, **kwargs)
queue_name = disk.get_remote_queue_name('storage')
def __on_abort(activity, error):
activity.disk.destroyed = timezone.now()
activity.disk.save()
if abortable_task:
from celery.contrib.abortable import AbortableAsyncResult
class AbortException(Exception):
pass
with disk_activity(code_suffix='deploy', disk=disk,
task_uuid=task_uuid, user=user,
on_abort=__on_abort) as act:
with act.sub_activity('downloading_disk'):
result = remote_tasks.download.apply_async(
kwargs={'url': url, 'parent_id': task_uuid,
params.setdefault('name', url.split('/')[-1])
params.setdefault('type', 'iso')
params.setdefault('size', None)
disk = cls.__create(params=params, user=user)
queue_name = disk.get_remote_queue_name('storage', priority='slow')
remote = storage_tasks.download.apply_async(
kwargs={'url': url, 'parent_id': task.request.id,
'disk': disk.get_disk_desc()},
queue=queue_name)
while True:
try:
size = result.get(timeout=5)
size = remote.get(timeout=5)
break
except TimeoutError:
if abortable_task and abortable_task.is_aborted():
AbortableAsyncResult(result.id).abort()
raise AbortException("Download aborted by user.")
if task is not None and task.is_aborted():
AbortableAsyncResult(remote.id).abort()
raise Exception("Download aborted by user.")
disk.size = size
disk.is_ready = True
disk.save()
return disk
......@@ -473,81 +375,16 @@ class Disk(AclBase, TimeStampedModel):
if self.destroyed:
return False
with disk_activity(code_suffix='destroy', disk=self,
task_uuid=task_uuid, user=user):
self.destroyed = timezone.now()
self.save()
return True
def destroy_async(self, user=None):
"""Execute destroy asynchronously.
"""
return local_tasks.destroy.apply_async(args=[self, user],
queue='localhost.man')
def restore(self, user=None, task_uuid=None):
"""Recover destroyed disk from trash if possible.
"""
# TODO
pass
def restore_async(self, user=None):
local_tasks.restore.apply_async(args=[self, user],
queue='localhost.man')
def clone_async(self, new_disk=None, timeout=300, user=None):
"""Clone a Disk to another Disk
:param new_disk: optional, the new Disk object to clone in
:type new_disk: storage.models.Disk
:param user: Creator of the disk.
:type user: django.contrib.auth.User
:return: AsyncResult
"""
return local_tasks.clone.apply_async(args=[self, new_disk,
timeout, user],
queue="localhost.man")
def clone(self, disk=None, user=None, task_uuid=None, timeout=300):
"""Cloning Disk into another Disk.
The Disk.type can'T be snapshot.
:param new_disk: optional, the new Disk object to clone in
:type new_disk: storage.models.Disk
:param user: Creator of the disk.
:type user: django.contrib.auth.User
:return: the cloned Disk object.
"""
banned_types = ['qcow2-snap']
if self.type in banned_types:
raise self.WrongDiskTypeError(self.type)
if self.is_in_use:
raise self.DiskInUseError(self)
if not self.is_ready:
raise self.DiskIsNotReady(self)
if not disk:
base = None
if self.type == "iso":
base = self
disk = Disk.create(datastore=self.datastore,
name=self.name, size=self.size,
type=self.type, base=base)
with disk_activity(code_suffix="clone", disk=self,
user=user, task_uuid=task_uuid):
with disk_activity(code_suffix="deploy", disk=disk,
user=user, task_uuid=task_uuid):
queue_name = self.get_remote_queue_name('storage')
remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
disk.get_disk_desc()],
queue=queue_name
).get() # Timeout
return disk
def save_as(self, user=None, task_uuid=None, timeout=300):
"""Save VM as template.
......@@ -582,65 +419,11 @@ class Disk(AclBase, TimeStampedModel):
name=self.name, size=self.size,
type=new_type)
with disk_activity(code_suffix="save_as", disk=self,
user=user, task_uuid=task_uuid):
with disk_activity(code_suffix="deploy", disk=disk,
user=user, task_uuid=task_uuid):
queue_name = self.get_remote_queue_name('storage')
remote_tasks.merge.apply_async(args=[self.get_disk_desc(),
queue_name = self.get_remote_queue_name("storage", priority="slow")
storage_tasks.merge.apply_async(args=[self.get_disk_desc(),
disk.get_disk_desc()],
queue=queue_name
).get() # Timeout
disk.is_ready = True
disk.save()
return disk
class DiskActivity(ActivityModel):
disk = ForeignKey(Disk, related_name='activity_log',
help_text=_('Disk this activity works on.'),
verbose_name=_('disk'))
@classmethod
def create(cls, code_suffix, disk, task_uuid=None, user=None):
act = cls(activity_code='storage.Disk.' + code_suffix,
disk=disk, parent=None, started=timezone.now(),
task_uuid=task_uuid, user=user)
act.save()
return act
def __unicode__(self):
if self.parent:
return '{}({})->{}'.format(self.parent.activity_code,
self.disk,
self.activity_code)
else:
return '{}({})'.format(self.activity_code,
self.disk)
def create_sub(self, code_suffix, task_uuid=None):
act = DiskActivity(
activity_code=self.activity_code + '.' + code_suffix,
disk=self.disk, parent=self, started=timezone.now(),
task_uuid=task_uuid, user=self.user)
act.save()
return act
@contextmanager
def sub_activity(self, code_suffix, task_uuid=None):
act = self.create_sub(code_suffix, task_uuid)
return activitycontextimpl(act)
@contextmanager
def disk_activity(code_suffix, disk, task_uuid=None, user=None,
on_abort=None, on_commit=None):
act = DiskActivity.create(code_suffix, disk, task_uuid, user)
return activitycontextimpl(act, on_abort=on_abort, on_commit=on_commit)
@worker_ready.connect()
def cleanup(conf=None, **kwargs):
# TODO check if other manager workers are running
for i in DiskActivity.objects.filter(finished__isnull=True):
i.finish(False, "Manager is restarted, activity is cleaned up. "
"You can try again now.")
logger.error('Forced finishing stale activity %s', i)
......@@ -18,7 +18,7 @@
from storage.models import DataStore
from manager.mancelery import celery
import logging
from storage.tasks import remote_tasks
from storage.tasks import storage_tasks
logger = logging.getLogger(__name__)
......@@ -34,18 +34,18 @@ def garbage_collector(timeout=15):
:type timeout: int
"""
for ds in DataStore.objects.all():
queue_name = ds.get_remote_queue_name('storage')
files = set(remote_tasks.list_files.apply_async(
queue_name = ds.get_remote_queue_name('storage', priority='fast')
files = set(storage_tasks.list_files.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout))
disks = set(ds.get_deletable_disks())
queue_name = ds.get_remote_queue_name('storage')
queue_name = ds.get_remote_queue_name('storage', priority='slow')
for i in disks & files:
logger.info("Image: %s at Datastore: %s moved to trash folder." %
(i, ds.path))
remote_tasks.move_to_trash.apply_async(
storage_tasks.move_to_trash.apply_async(
args=[ds.path, i], queue=queue_name).get(timeout=timeout)
try:
remote_tasks.make_free_space.apply_async(
storage_tasks.make_free_space.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout)
except Exception as e:
logger.warning(str(e))
......@@ -63,7 +63,7 @@ def list_orphan_disks(timeout=15):
import re
for ds in DataStore.objects.all():
queue_name = ds.get_remote_queue_name('storage')
files = set(remote_tasks.list_files.apply_async(
files = set(storage_tasks.list_files.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout))
disks = set([disk.filename for disk in ds.disk_set.all()])
for i in files - disks:
......@@ -80,7 +80,7 @@ def list_missing_disks(timeout=15):
"""
for ds in DataStore.objects.all():
queue_name = ds.get_remote_queue_name('storage')
files = set(remote_tasks.list_files.apply_async(
files = set(storage_tasks.list_files.apply_async(
args=[ds.path], queue=queue_name).get(timeout=timeout))
disks = set([disk.filename for disk in
ds.disk_set.filter(destroyed__isnull=True)])
......
......@@ -19,7 +19,7 @@ from datetime import timedelta
from django.test import TestCase
from django.utils import timezone
from mock import MagicMock, Mock
from mock import MagicMock
from ..models import Disk, DataStore
......@@ -99,11 +99,6 @@ class DiskTestCase(TestCase):
with self.assertRaises(MockException):
Disk.save_as(d)
def test_download_percentage_no_download(self):
d = MagicMock(spec=Disk)
d.is_downloading = Mock(return_value=False)
assert Disk.get_download_percentage(d) is None
def test_undeployed_disk_ready(self):
d = self._disk()
assert not d.is_ready
......@@ -116,6 +116,20 @@ class InstanceActivity(ActivityModel):
else:
return 'failed'
def has_percentage(self):
op = self.instance.get_operation_from_activity_code(self.activity_code)
return (self.task_uuid and op and op.has_percentage
and not self.finished)
def get_percentage(self):
"""Returns the percentage of the running operation if available.
"""
result = celery.AsyncResult(id=self.task_uuid)
if self.has_percentage() and result.info is not None:
return result.info.get("percent")
else:
return 0
@property
def is_abortable(self):
"""Can the activity be aborted?
......
......@@ -910,6 +910,11 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
acts = (self.activity_log.filter(parent=None).
order_by('-started').
select_related('user').prefetch_related('children'))
# Check latest activity for percentage
for i in acts:
if i.has_percentage():
i.has_percent = True
i.percentage = i.get_percentage()
if user is not None:
for i in acts:
i.is_abortable_for_user = partial(i.is_abortable_for,
......
......@@ -26,6 +26,7 @@ from django.utils.translation import ugettext_lazy as _
from celery.exceptions import TimeLimitExceeded
from common.operations import Operation, register_operation
from storage.models import Disk
from .tasks.local_tasks import (
abortable_async_instance_operation, abortable_async_node_operation,
)
......@@ -99,24 +100,48 @@ class AddInterfaceOperation(InstanceOperation):
register_operation(AddInterfaceOperation)
class AddDiskOperation(InstanceOperation):
activity_code_suffix = 'add_disk'
id = 'add_disk'
name = _("add disk")
description = _("Add the specified disk to the VM.")
class CreateDiskOperation(InstanceOperation):
activity_code_suffix = 'create_disk'
id = 'create_disk'
name = _("create disk")
description = _("Create empty disk for the VM.")
def check_precond(self):
super(AddDiskOperation, self).check_precond()
super(CreateDiskOperation, self).check_precond()
# TODO remove check when hot-attach is implemented
if self.instance.status not in ['STOPPED']:
raise self.instance.WrongStateError(self.instance)
def _operation(self, activity, user, system, disk):
def _operation(self, user, size, name=None):
# TODO implement with hot-attach when it'll be available
return self.instance.disks.add(disk)
if not name:
name = "new disk"
disk = Disk.create(size=size, name=name, type="qcow2-norm")
self.instance.disks.add(disk)
register_operation(CreateDiskOperation)
class DownloadDiskOperation(InstanceOperation):
activity_code_suffix = 'download_disk'
id = 'download_disk'
name = _("download disk")
description = _("Download disk for the VM.")
abortable = True
has_percentage = True
def check_precond(self):
super(DownloadDiskOperation, self).check_precond()
# TODO remove check when hot-attach is implemented
if self.instance.status not in ['STOPPED']:
raise self.instance.WrongStateError(self.instance)
def _operation(self, user, url, task, name=None):
# TODO implement with hot-attach when it'll be available
disk = Disk.download(url=url, name=name, task=task)
self.instance.disks.add(disk)
register_operation(AddDiskOperation)
register_operation(DownloadDiskOperation)
class DeployOperation(InstanceOperation):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment