Commit 92240946 by Szeberényi Imre

Migration to IK

parent c0bfc619
CELERY_RESULT_BACKEND = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_RESULT_EXPIRES = 300 CELERY_TASK_RESULT_EXPIRES = 300
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True CELERY_ENABLE_UTC = True
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
...@@ -6,8 +6,12 @@ from netcelery import celery ...@@ -6,8 +6,12 @@ from netcelery import celery
from os import getenv from os import getenv
from vm import VMNetwork from vm import VMNetwork
from vmcelery import native_ovs from vmcelery import native_ovs
from celery.utils.log import get_task_logger
driver = getenv("HYPERVISOR_TYPE", "test") driver = getenv("HYPERVISOR_TYPE", "test")
logger = get_task_logger(__name__)
@celery.task @celery.task
def create(network): def create(network):
...@@ -40,7 +44,7 @@ def ovs_command_execute(command): ...@@ -40,7 +44,7 @@ def ovs_command_execute(command):
""" """
command = ['sudo', 'ovs-vsctl'] + command command = ['sudo', 'ovs-vsctl'] + command
return_val = subprocess.call(command) return_val = subprocess.call(command)
logging.info('OVS command: %s executed.', command) logger.info('OVS command: %s executed.', command)
return return_val return return_val
...@@ -53,7 +57,7 @@ def ofctl_command_execute(command): ...@@ -53,7 +57,7 @@ def ofctl_command_execute(command):
""" """
command = ['sudo', 'ovs-ofctl'] + command command = ['sudo', 'ovs-ofctl'] + command
return_val = subprocess.call(command) return_val = subprocess.call(command)
logging.info('OVS flow command: %s executed.', command) logger.info('OVS flow command: %s executed.', command)
return return_val return return_val
...@@ -304,7 +308,7 @@ def pull_up_interface(network): ...@@ -304,7 +308,7 @@ def pull_up_interface(network):
""" """
command = ['sudo', 'ip', 'link', 'set', 'up', network.name] command = ['sudo', 'ip', 'link', 'set', 'up', network.name]
return_val = subprocess.call(command) return_val = subprocess.call(command)
logging.info('IP command: %s executed.', command) logger.info('IP command: %s executed.', command)
return return_val return return_val
......
...@@ -21,8 +21,8 @@ class VMInstance: ...@@ -21,8 +21,8 @@ class VMInstance:
raw_data = None raw_data = None
def __init__(self, def __init__(self,
name, name=None,
vcpu, vcpu=None,
vcpu_max=None, vcpu_max=None,
memory_max=None, memory_max=None,
memory=None, memory=None,
...@@ -228,8 +228,8 @@ class VMDisk: ...@@ -228,8 +228,8 @@ class VMDisk:
cache_size = None cache_size = None
def __init__(self, def __init__(self,
name, name=None,
source, source=None,
disk_type="file", disk_type="file",
disk_device="disk", disk_device="disk",
driver_name="qemu", driver_name="qemu",
......
...@@ -7,14 +7,11 @@ import socket ...@@ -7,14 +7,11 @@ import socket
import json import json
from decorator import decorator from decorator import decorator
import lxml.etree as ET import lxml.etree as ET
from psutil import cpu_count, virtual_memory, cpu_percent from psutil import cpu_count, virtual_memory, cpu_percent
from celery.contrib.abortable import AbortableTask from celery.contrib.abortable import AbortableTask
from vm import VMInstance, VMDisk, VMNetwork from vm import VMInstance, VMDisk, VMNetwork
from vmcelery import celery, lib_connection, to_bool from vmcelery import celery, lib_connection, to_bool
from celery.utils.log import get_task_logger
sys.path.append(os.path.dirname(os.path.basename(__file__))) sys.path.append(os.path.dirname(os.path.basename(__file__)))
...@@ -31,6 +28,8 @@ state_dict = {0: 'NOSTATE', ...@@ -31,6 +28,8 @@ state_dict = {0: 'NOSTATE',
} }
logger = get_task_logger(__name__)
# class Singleton(type): # class Singleton(type):
# #
# """ Singleton class.""" # """ Singleton class."""
...@@ -74,18 +73,18 @@ def req_connection(original_function, *args, **kw): ...@@ -74,18 +73,18 @@ def req_connection(original_function, *args, **kw):
Return the decorateed function Return the decorateed function
""" """
logging.debug("Decorator running") logger.debug("Decorator running")
if Connection.get() is None: if Connection.get() is None:
connect() connect()
try: try:
logging.debug("Decorator calling original function") logger.debug("Decorator calling original function")
return_value = original_function(*args, **kw) return_value = original_function(*args, **kw)
finally: finally:
logging.debug("Finally part of decorator") logger.debug("Finally part of decorator")
disconnect() disconnect()
return return_value return return_value
else: else:
logging.debug("Decorator calling original \ logger.debug("Decorator calling original \
function with active connection") function with active connection")
return_value = original_function(*args, **kw) return_value = original_function(*args, **kw)
return return_value return return_value
...@@ -101,7 +100,7 @@ def wrap_libvirtError(original_function, *args, **kw): ...@@ -101,7 +100,7 @@ def wrap_libvirtError(original_function, *args, **kw):
try: try:
return original_function(*args, **kw) return original_function(*args, **kw)
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
logging.error(e.get_error_message()) logger.error(e.get_error_message())
e_msg = e.get_error_message() e_msg = e.get_error_message()
if vm_xml_dump is not None: if vm_xml_dump is not None:
e_msg += "\n" e_msg += "\n"
...@@ -122,12 +121,12 @@ def connect(connection_string='qemu:///system'): ...@@ -122,12 +121,12 @@ def connect(connection_string='qemu:///system'):
if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")): if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")):
if Connection.get() is None: if Connection.get() is None:
Connection.set(libvirt.open(connection_string)) Connection.set(libvirt.open(connection_string))
logging.debug("Connection estabilished to libvirt.") logger.debug("Connection estabilished to libvirt.")
else: else:
logging.debug("There is already an active connection to libvirt.") logger.debug("There is already an active connection to libvirt.")
else: else:
Connection.set(lib_connection) Connection.set(lib_connection)
logging.debug("Using celery libvirt connection connection.") logger.debug("Using celery libvirt connection connection.")
@wrap_libvirtError @wrap_libvirtError
...@@ -135,13 +134,13 @@ def disconnect(): ...@@ -135,13 +134,13 @@ def disconnect():
""" Disconnect from the active libvirt daemon connection.""" """ Disconnect from the active libvirt daemon connection."""
if os.getenv('LIBVIRT_KEEPALIVE') is None: if os.getenv('LIBVIRT_KEEPALIVE') is None:
if Connection.get() is None: if Connection.get() is None:
logging.debug('There is no available libvirt conection.') logger.debug('There is no available libvirt conection.')
else: else:
Connection.get().close() Connection.get().close()
logging.debug('Connection closed to libvirt.') logger.debug('Connection closed to libvirt.')
Connection.set(None) Connection.set(None)
else: else:
logging.debug('Keepalive connection should not close.') logger.debug('Keepalive connection should not close.')
@celery.task @celery.task
...@@ -150,7 +149,7 @@ def disconnect(): ...@@ -150,7 +149,7 @@ def disconnect():
def define(vm): def define(vm):
""" Define permanent virtual machine from xml. """ """ Define permanent virtual machine from xml. """
Connection.get().defineXML(vm.dump_xml()) Connection.get().defineXML(vm.dump_xml())
logging.info("Virtual machine %s is defined from xml", vm.name) logger.info("Virtual machine %s is defined from xml", vm.name)
@celery.task @celery.task
...@@ -174,7 +173,7 @@ def create(vm_desc): ...@@ -174,7 +173,7 @@ def create(vm_desc):
if vm.vm_type == "test": if vm.vm_type == "test":
vm.arch = "i686" vm.arch = "i686"
vm_xml_dump = vm.dump_xml() vm_xml_dump = vm.dump_xml()
logging.info(vm_xml_dump) logger.info(vm_xml_dump)
# Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver # Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver
if vm.vm_type == "test": if vm.vm_type == "test":
Connection.get().createXML( Connection.get().createXML(
...@@ -183,10 +182,10 @@ def create(vm_desc): ...@@ -183,10 +182,10 @@ def create(vm_desc):
domain.suspend() domain.suspend()
# Real driver create # Real driver create
else: else:
logging.info("Virtual machine %s being created from xml", vm.name) logger.info("Virtual machine %s being created from xml", vm.name)
Connection.get().createXML( Connection.get().createXML(
vm_xml_dump, libvirt.VIR_DOMAIN_START_PAUSED) vm_xml_dump, libvirt.VIR_DOMAIN_START_PAUSED)
logging.info("Virtual machine %s is created from xml", vm.name) logger.info("Virtual machine %s is created from xml", vm.name)
# context # context
try: try:
sock = socket.create_connection(('127.0.0.1', 1235), 3) sock = socket.create_connection(('127.0.0.1', 1235), 3)
...@@ -195,7 +194,7 @@ def create(vm_desc): ...@@ -195,7 +194,7 @@ def create(vm_desc):
sock.sendall(json.dumps(data)) sock.sendall(json.dumps(data))
sock.close() sock.close()
except socket.error: except socket.error:
logging.error('Unable to connect to context server') logger.error('Unable to connect to context server')
return vm_xml_dump return vm_xml_dump
...@@ -211,12 +210,12 @@ class shutdown(AbortableTask): ...@@ -211,12 +210,12 @@ class shutdown(AbortableTask):
def run(self, args): def run(self, args):
from time import sleep from time import sleep
name, = args name, = args
logging.info("Shutdown started for vm: %s", name) logger.info("Shutdown started for vm: %s", name)
try: try:
domain = lookupByName(name) domain = lookupByName(name)
logging.info("%s domain found in shutdown", name) logger.info("%s domain found in shutdown", name)
domain.shutdown() domain.shutdown()
logging.info("Domain shutdown called for vm: %s", name) logger.info("Domain shutdown called for vm: %s", name)
while True: while True:
try: try:
Connection.get().lookupByName(name) Connection.get().lookupByName(name)
...@@ -227,7 +226,7 @@ class shutdown(AbortableTask): ...@@ -227,7 +226,7 @@ class shutdown(AbortableTask):
raise raise
else: else:
if self.is_aborted(): if self.is_aborted():
logging.info("Shutdown aborted on vm: %s", name) logger.info("Shutdown aborted on vm: %s", name)
return return
sleep(5) sleep(5)
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
...@@ -607,7 +606,7 @@ def __check_detach(domain, disk): ...@@ -607,7 +606,7 @@ def __check_detach(domain, disk):
def attach_network(name, net): def attach_network(name, net):
domain = lookupByName(name) domain = lookupByName(name)
net = VMNetwork.deserialize(net) net = VMNetwork.deserialize(net)
logging.error(net.dump_xml()) logger.error(net.dump_xml())
domain.attachDevice(net.dump_xml()) domain.attachDevice(net.dump_xml())
...@@ -628,7 +627,7 @@ def resize_disk(name, path, size): ...@@ -628,7 +627,7 @@ def resize_disk(name, path, size):
# domain.blockResize(path, int(size), # domain.blockResize(path, int(size),
# flags=libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES) # flags=libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES)
# To be compatible with libvirt < 0.9.11 # To be compatible with libvirt < 0.9.11
logging.debug(" === Resize : " + size) logger.debug(" === Resize : " + size)
domain.blockResize(path,int(size)//1024, 0) domain.blockResize(path,int(size)//1024, 0)
...@@ -648,7 +647,7 @@ def get_architecture(): ...@@ -648,7 +647,7 @@ def get_architecture():
@celery.task @celery.task
def get_core_num(): def get_core_num():
return cpu_count return cpu_count()
@celery.task @celery.task
...@@ -667,12 +666,13 @@ def get_driver_version(): ...@@ -667,12 +666,13 @@ def get_driver_version():
'commit_text': lc.summary, 'commit_text': lc.summary,
'is_dirty': repo.is_dirty()} 'is_dirty': repo.is_dirty()}
except Exception as e: except Exception as e:
logging.exception("Unhandled exception: %s", e) logger.exception("Unhandled exception: %s", e)
return None return None
@celery.task @celery.task
def get_info(): def get_info():
logger.debug("Get_Info")
return {'core_num': get_core_num(), return {'core_num': get_core_num(),
'ram_size': get_ram_size(), 'ram_size': get_ram_size(),
'architecture': get_architecture(), 'architecture': get_architecture(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment