Commit 2bd2679c by Guba Sándor

storage: reworked create_from_url to be abortable

parent 95663de7
...@@ -13,6 +13,7 @@ from sizefield.models import FileSizeField ...@@ -13,6 +13,7 @@ from sizefield.models import FileSizeField
from acl.models import AclBase from acl.models import AclBase
from .tasks import local_tasks, remote_tasks from .tasks import local_tasks, remote_tasks
from celery.exceptions import TimeoutError
from common.models import ActivityModel, activitycontextimpl, WorkerNotFound from common.models import ActivityModel, activitycontextimpl, WorkerNotFound
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -236,32 +237,52 @@ class Disk(AclBase, TimeStampedModel): ...@@ -236,32 +237,52 @@ class Disk(AclBase, TimeStampedModel):
return disk return disk
@classmethod @classmethod
def create_from_url_async(cls, url, params, user=None): def create_from_url_async(cls, url, params=None, user=None):
return local_tasks.create_from_url.apply_async(args=[cls, url, params, return local_tasks.create_from_url.apply_async(kwargs={
user], 'cls': cls, 'url': url, 'params': params, 'user': user},
queue='localhost.man') queue='localhost.man')
@classmethod def create_from_url(cls, url, params={}, user=None, task_uuid=None,
def create_from_url(cls, url, params={}, user=None, task_uuid=None): abortable_task=None):
disk = cls() disk = cls()
disk.filename = str(uuid.uuid4()) disk.filename = str(uuid.uuid4())
disk.type = "iso" disk.type = "iso"
disk.size = 1 disk.size = 1
disk.datastore = DataStore.objects.all()[0] disk.datastore = DataStore.objects.all()[0]
disk.__dict__.update(params) if params:
disk.__dict__.update(params)
disk.save() disk.save()
queue_name = disk.get_remote_queue_name('storage') queue_name = disk.get_remote_queue_name('storage')
def __onabort(activity, error): def __on_abort(activity, error):
activity.disk.delete() activity.disk.destroyed = timezone.now()
raise error activity.disk.save()
if abortable_task:
from celery.contrib.abortable import AbortableAsyncResult
class AbortException(Exception):
pass
with disk_activity(code_suffix='download', disk=disk, with disk_activity(code_suffix='download', disk=disk,
task_uuid=task_uuid, user=user): task_uuid=task_uuid, user=user,
size = remote_tasks.download.apply_async( on_abort=__on_abort):
kwargs={'url': url, 'disk': disk.get_disk_desc()}, result = remote_tasks.download.apply_async(
queue=queue_name).get() kwargs={'url': url, 'parent_id': task_uuid,
'disk': disk.get_disk_desc()},
queue=queue_name)
while True:
try:
size = result.get(timeout=5)
break
except TimeoutError:
logger.info(abortable_task)
logger.info(abortable_task.is_aborted())
if abortable_task and abortable_task.is_aborted():
AbortableAsyncResult(result.id).abort()
raise AbortException("Download aborted by user.")
disk.size = size disk.size = size
disk.ready = True
disk.save() disk.save()
def destroy(self, user=None, task_uuid=None): def destroy(self, user=None, task_uuid=None):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment