local_tasks.py 2.45 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18
from manager.mancelery import celery
19
from celery.contrib.abortable import AbortableTask
Guba Sándor committed
20 21 22


@celery.task
23
def check_queue(storage, queue_id, priority):
24 25 26 27
    ''' Celery inspect job to check for active workers at queue_id
        return True/False
    '''
    queue_name = storage + "." + queue_id
28 29 30
    if priority is not None:
        queue_name = queue_name + "." + priority
    inspect = celery.control.inspect()
31
    inspect.timeout = 0.5
32
    active_queues = inspect.active_queues()
33 34 35 36 37 38
    if active_queues is None:
        return False

    queue_names = (queue['name'] for worker in active_queues.itervalues()
                   for queue in worker)
    return queue_name in queue_names
39 40 41


@celery.task
42 43 44 45 46 47
def save_as(disk, timeout, user):
    disk.save_disk_as(task_uuid=save_as.request.id, user=user,
                      disk=disk, timeout=timeout)


@celery.task
48 49 50 51 52 53
def clone(disk, new_disk, timeout, user):
    disk.clone(task_uuid=save_as.request.id, user=user,
               disk=new_disk, timeout=timeout)


@celery.task
Guba Sándor committed
54
def deploy(disk, user):
55
    disk.deploy(task_uuid=deploy.request.id, user=user)
Guba Sándor committed
56 57


Guba Sándor committed
58
@celery.task
59 60
def destroy(disk, user):
    disk.destroy(task_uuid=destroy.request.id, user=user)
61 62 63 64 65


@celery.task
def restore(disk, user):
    disk.restore(task_uuid=restore.request.id, user=user)
66 67


68 69 70 71 72 73 74
@celery.task(base=AbortableTask, bind=True)
def create_from_url(self, **kwargs):
    Disk = kwargs.pop('cls')
    Disk.create_from_url(url=kwargs.pop('url'),
                         task_uuid=self.request.id,
                         abortable_task=self,
                         **kwargs)
75 76 77


@celery.task
78 79 80 81
def create_empty(Disk, instance, user, params):
    Disk.create_empty(instance, user,
                      task_uuid=create_empty.request.id,
                      **params)