mancelery.py 2.39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18
from celery import Celery
19
from celery.signals import worker_ready
20
from datetime import timedelta
21
from kombu import Queue, Exchange
Guba Sándor committed
22
from os import getenv
23

Guba Sándor committed
24
HOSTNAME = "localhost"
25
CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/")
26 27
QUEUE_NAME = HOSTNAME + '.man'

28

29
celery = Celery('manager',
Guba Sándor committed
30
                broker=getenv("AMQP_URI"),
31 32
                include=['vm.tasks.local_tasks',
                         'vm.tasks.local_periodic_tasks',
Bach Dániel committed
33
                         'vm.tasks.local_agent_tasks',
34
                         'storage.tasks.local_tasks',
35
                         'storage.tasks.periodic_tasks',
36
                         'firewall.tasks.local_tasks',
37
                         'dashboard.tasks.local_periodic_tasks',
38
                         ])
39 40

celery.conf.update(
41 42
    CELERY_RESULT_BACKEND='cache',
    CELERY_CACHE_BACKEND=CACHE_URI,
Őry Máté committed
43
    CELERY_TASK_RESULT_EXPIRES=300,
44
    CELERY_QUEUES=(
45 46
        Queue(HOSTNAME + '.man', Exchange('manager', type='direct'),
              routing_key="manager"),
47 48
    ),
    CELERYBEAT_SCHEDULE={
49 50 51 52 53
        'storage.periodic_tasks': {
            'task': 'storage.tasks.periodic_tasks.garbage_collector',
            'schedule': timedelta(hours=1),
            'options': {'queue': 'localhost.man'}
        },
54
        'dashboard.send_email_notifications': {
55 56 57 58 59
            'task': 'dashboard.tasks.local_periodic_tasks.'
            'send_email_notifications',
            'schedule': timedelta(hours=24),
            'options': {'queue': 'localhost.man'}
        },
60 61
    }

62
)
63 64 65 66 67 68 69


@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
    '''Discard all task and clean up activity.'''
    from vm.models.activity import cleanup
    cleanup(queue_name=QUEUE_NAME)