local_tasks.py 1.6 KB
Newer Older
1
from manager.mancelery import celery
2
from celery.contrib.abortable import AbortableTask
Guba Sándor committed
3 4 5


@celery.task
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
def check_queue(storage, queue_id):
    ''' Celery inspect job to check for active workers at queue_id
        return True/False
    '''
    drivers = ['storage', 'download']
    worker_list = [storage + "." + d for d in drivers]
    queue_name = storage + "." + queue_id
    # v is List of List of queues dict
    active_queues = celery.control.inspect(worker_list).active_queues()
    if active_queues is not None:
        node_workers = [v for k, v in active_queues.iteritems()]
        for worker in node_workers:
            for queue in worker:
                if queue['name'] == queue_name:
                    return True
    return False


@celery.task
Guba Sándor committed
25
def deploy(disk, user):
26
    disk.deploy(task_uuid=deploy.request.id, user=user)
Guba Sándor committed
27 28


Guba Sándor committed
29
@celery.task
30 31
def destroy(disk, user):
    disk.destroy(task_uuid=destroy.request.id, user=user)
32 33 34 35 36


@celery.task
def restore(disk, user):
    disk.restore(task_uuid=restore.request.id, user=user)
37 38


39 40 41 42 43 44 45 46 47 48 49 50
class create_from_url(AbortableTask):

    def run(self, **kwargs):
        Disk = kwargs['cls']
        url = kwargs['url']
        params = kwargs['params']
        user = kwargs['user']
        Disk.create_from_url(url=url,
                             params=params,
                             task_uuid=create_from_url.request.id,
                             abortable_task=self,
                             user=user)
51 52 53 54 55 56


@celery.task
def create_empty(Disk, instance, params, user):
    Disk.create_empty(instance, params, user,
                      task_uuid=create_empty.request.id)