local_tasks.py 1.81 KB
Newer Older
1
from manager.mancelery import celery
2
from celery.contrib.abortable import AbortableTask
Guba Sándor committed
3 4 5


@celery.task
6 7 8 9 10 11 12 13
def check_queue(storage, queue_id):
    ''' Celery inspect job to check for active workers at queue_id
        return True/False
    '''
    drivers = ['storage', 'download']
    worker_list = [storage + "." + d for d in drivers]
    queue_name = storage + "." + queue_id
    active_queues = celery.control.inspect(worker_list).active_queues()
14 15 16 17 18 19
    if active_queues is None:
        return False

    queue_names = (queue['name'] for worker in active_queues.itervalues()
                   for queue in worker)
    return queue_name in queue_names
20 21 22


@celery.task
23 24 25 26 27 28
def save_as(disk, timeout, user):
    disk.save_disk_as(task_uuid=save_as.request.id, user=user,
                      disk=disk, timeout=timeout)


@celery.task
29 30 31 32 33 34
def clone(disk, new_disk, timeout, user):
    disk.clone(task_uuid=save_as.request.id, user=user,
               disk=new_disk, timeout=timeout)


@celery.task
Guba Sándor committed
35
def deploy(disk, user):
36
    disk.deploy(task_uuid=deploy.request.id, user=user)
Guba Sándor committed
37 38


Guba Sándor committed
39
@celery.task
40 41
def destroy(disk, user):
    disk.destroy(task_uuid=destroy.request.id, user=user)
42 43 44 45 46


@celery.task
def restore(disk, user):
    disk.restore(task_uuid=restore.request.id, user=user)
47 48


49 50 51 52
class CreateFromURLTask(AbortableTask):

    def __init__(self):
        self.bind(celery)
53 54

    def run(self, **kwargs):
55 56
        Disk = kwargs.pop('cls')
        Disk.create_from_url(url=kwargs.pop('url'),
57 58
                             task_uuid=create_from_url.request.id,
                             abortable_task=self,
59 60
                             **kwargs)
create_from_url = CreateFromURLTask()
61 62 63


@celery.task
64 65 66 67
def create_empty(Disk, instance, user, params):
    Disk.create_empty(instance, user,
                      task_uuid=create_empty.request.id,
                      **params)