vmcelery.py 1.47 KB
Newer Older
Guba Sándor committed
1
""" Celery module for libvirt RPC calls. """
Guba Sándor committed
2 3
from celery import Celery
from kombu import Queue, Exchange
4
from os import getenv
Guba Sándor committed
5

6 7
from argparse import ArgumentParser

8 9 10 11

def to_bool(value):
    return value.lower() in ("true", "yes", "y", "t")

12

13 14 15 16 17 18 19 20 21 22 23 24 25
if to_bool(getenv("LIBVIRT_TEST", "False")):
    HOSTNAME = "vmdriver.test"
else:
    parser = ArgumentParser()
    parser.add_argument("-n", "--hostname", dest="hostname",
                        help="Define the full queue name with"
                        "with priority", metavar="hostname.queue.priority")
    (args, unknwon_args) = parser.parse_known_args()
    HOSTNAME = vars(args).pop("hostname")
    if HOSTNAME is None:
        raise Exception("You must define hostname as -n <hostname> or "
                        "--hostname=<hostname>.\n"
                        "Hostname format must be hostname.module.priority.")
Guba Sándor committed
26

27 28
AMQP_URI = getenv('AMQP_URI')

Guba Sándor committed
29

30
# Global configuration parameters declaration
tarokkk committed
31
lib_connection = None
32 33 34 35 36 37 38
native_ovs = False

if to_bool(getenv('LIBVIRT_KEEPALIVE', "False")):
    import libvirt
    lib_connection = libvirt.open(getenv('LIBVIRT_URI'))
if to_bool(getenv('NATIVE_OVS', "False")):
    native_ovs = True
tarokkk committed
39

40
celery = Celery('vmcelery',
41 42
                broker=AMQP_URI,
                include=['vmdriver'])
Guba Sándor committed
43 44

celery.conf.update(
45
    CELERY_RESULT_BACKEND='amqp',
Bach Dániel committed
46
    CELERY_TASK_RESULT_EXPIRES=300,
Guba Sándor committed
47
    CELERY_QUEUES=(
48
        Queue(HOSTNAME, Exchange(
Guba Sándor committed
49 50 51
            'vmdriver', type='direct'), routing_key="vmdriver"),
    )
)