Commit f759eb24 by Czémán Arnold

add secret handler functions and minimal refactor on CephVMDisk

parent a815ee13
...@@ -2,7 +2,14 @@ import rados ...@@ -2,7 +2,14 @@ import rados
import rbd import rbd
import os import os
import subprocess import subprocess
import libvirt
import lxml.etree as ET
from base64 import b64decode
import logging
from util import req_connection, wrap_libvirtError, Connection
logger = logging.getLogger(__name__)
DUMP_SIZE_LIMIT = int(os.getenv("DUMP_SIZE_LIMIT", 20 * 1024 ** 3)) # 20GB DUMP_SIZE_LIMIT = int(os.getenv("DUMP_SIZE_LIMIT", 20 * 1024 ** 3)) # 20GB
...@@ -83,3 +90,60 @@ def restore(connection, poolname, diskname): ...@@ -83,3 +90,60 @@ def restore(connection, poolname, diskname):
with CephConnection(poolname) as conn: with CephConnection(poolname) as conn:
rbd_inst = rbd.RBD() rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, diskname) rbd_inst.remove(conn.ioctx, diskname)
def generate_secret_xml(user):
xml = ET.Element(
"secret",
attrib={
"ephemeral": "no",
"private": "no",
})
ET.SubElement(xml, "description").text = "CEPH passpharse for " + user
usage = ET.SubElement(xml, "usage", attrib={"type": "ceph"})
ET.SubElement(usage, "name").text = user
return ET.tostring(xml,
encoding='utf8',
method='xml',
pretty_print=True)
@req_connection
@wrap_libvirtError
def find_secret(user):
conn = Connection.get()
try:
return conn.secretLookupByUsage(
libvirt.VIR_SECRET_USAGE_TYPE_CEPH, user)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_SECRET:
return None
raise
@req_connection
@wrap_libvirtError
def create_secret(user, secretkey):
xml = generate_secret_xml(user)
conn = Connection.get()
secret = conn.secretDefineXML(xml)
decoded_key = b64decode(secretkey)
secret.setValue(decoded_key)
logger.info("Secret generated with uuid: '%s'", secret.UUIDString())
return secret
@wrap_libvirtError
def delete_secret(user):
secret = find_secret(user)
if secret is not None:
secret.undefine()
logger.info("Secret with uuid: '%s' deleted", secret.UUIDString())
def check_secret(user, secretkey):
secret = find_secret(user)
if secret is None:
secret = create_secret(user, secretkey)
return secret.UUIDString()
import libvirt
from decorator import decorator
import logging
import os
from vmcelery import lib_connection, to_bool
vm_xml_dump = None
class Connection(object):
""" Singleton class to handle connection."""
# __metaclass__ = Singleton
connection = None
@classmethod
def get(cls):
""" Return the libvirt connection."""
return cls.connection
@classmethod
def set(cls, connection):
""" Set the libvirt connection."""
cls.connection = connection
@decorator
def req_connection(original_function, *args, **kw):
"""Connection checking decorator for libvirt.
If envrionment variable LIBVIRT_KEEPALIVE is set
it will use the connection from the celery worker.
Return the decorateed function
"""
logging.debug("Decorator running")
if Connection.get() is None:
connect()
try:
logging.debug("Decorator calling original function")
return_value = original_function(*args, **kw)
finally:
logging.debug("Finally part of decorator")
disconnect()
return return_value
else:
logging.debug("Decorator calling original \
function with active connection")
return_value = original_function(*args, **kw)
return return_value
@decorator
def wrap_libvirtError(original_function, *args, **kw):
""" Decorator to wrap libvirt error in simple Exception.
Return decorated function
"""
try:
return original_function(*args, **kw)
except libvirt.libvirtError as e:
logging.error(e.get_error_message())
e_msg = e.get_error_message()
if vm_xml_dump is not None:
e_msg += "\n"
e_msg += vm_xml_dump
new_e = Exception(e.get_error_message())
new_e.libvirtError = True
raise new_e
@wrap_libvirtError
def connect(connection_string='qemu:///system'):
""" Connect to the libvirt daemon.
String is specified in the connection_string parameter
the default is the local root.
"""
if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")):
if Connection.get() is None:
Connection.set(libvirt.open(connection_string))
logging.debug("Connection estabilished to libvirt.")
else:
logging.debug("There is already an active connection to libvirt.")
else:
Connection.set(lib_connection)
logging.debug("Using celery libvirt connection connection.")
@wrap_libvirtError
def disconnect():
""" Disconnect from the active libvirt daemon connection."""
if os.getenv('LIBVIRT_KEEPALIVE') is None:
if Connection.get() is None:
logging.debug('There is no available libvirt conection.')
else:
Connection.get().close()
logging.debug('Connection closed to libvirt.')
Connection.set(None)
else:
logging.debug('Keepalive connection should not close.')
import lxml.etree as ET import lxml.etree as ET
from vmcelery import native_ovs from vmcelery import native_ovs
from ceph import check_secret
# VM Instance class # VM Instance class
...@@ -255,7 +255,7 @@ class CephVMDisk(VMDisk): ...@@ -255,7 +255,7 @@ class CephVMDisk(VMDisk):
target_bus="virtio", target_bus="virtio",
protocol="rbd", protocol="rbd",
ceph_user=None, ceph_user=None,
secret_uuid=None): secret=None):
super(CephVMDisk, self).__init__( super(CephVMDisk, self).__init__(
source=source, source=source,
...@@ -270,7 +270,9 @@ class CephVMDisk(VMDisk): ...@@ -270,7 +270,9 @@ class CephVMDisk(VMDisk):
self.endpoints = endpoints self.endpoints = endpoints
self.protocol = protocol self.protocol = protocol
self.ceph_user = ceph_user self.ceph_user = ceph_user
self.secret_uuid = secret_uuid self.secret = secret
if ceph_user is not None and secret is not None:
check_secret(ceph_user, secret)
@classmethod @classmethod
def deserialize(cls, desc): def deserialize(cls, desc):
...@@ -289,11 +291,13 @@ class CephVMDisk(VMDisk): ...@@ -289,11 +291,13 @@ class CephVMDisk(VMDisk):
ET.SubElement(source, "host", ET.SubElement(source, "host",
attrib={"name": name, "port": unicode(port)}) attrib={"name": name, "port": unicode(port)})
if self.ceph_user is not None and self.secret_uuid is not None: if self.ceph_user is not None and self.secret is not None:
auth = ET.SubElement(xml_top, "auth", auth = ET.SubElement(
xml_top,
"auth",
attrib={"username": self.ceph_user}) attrib={"username": self.ceph_user})
ET.SubElement(auth, "secret", ET.SubElement(auth, "secret",
attrib={"type": "ceph", "uuid": self.secret_uuid}) attrib={"type": "ceph", "usage": self.ceph_user})
return xml_top return xml_top
......
...@@ -5,7 +5,6 @@ import os ...@@ -5,7 +5,6 @@ import os
import sys import sys
import socket import socket
import json import json
from decorator import decorator
import lxml.etree as ET import lxml.etree as ET
from psutil import NUM_CPUS, virtual_memory, cpu_percent from psutil import NUM_CPUS, virtual_memory, cpu_percent
...@@ -14,14 +13,15 @@ from celery.contrib.abortable import AbortableTask ...@@ -14,14 +13,15 @@ from celery.contrib.abortable import AbortableTask
from vm import VMInstance, VMDisk, CephVMDisk, VMNetwork from vm import VMInstance, VMDisk, CephVMDisk, VMNetwork
from vmcelery import celery, lib_connection, to_bool from vmcelery import celery
import ceph import ceph
import util
from util import req_connection, wrap_libvirtError, Connection
sys.path.append(os.path.dirname(os.path.basename(__file__)))
vm_xml_dump = None sys.path.append(os.path.dirname(os.path.basename(__file__)))
state_dict = {0: 'NOSTATE', state_dict = {0: 'NOSTATE',
1: 'RUNNING', 1: 'RUNNING',
...@@ -47,106 +47,6 @@ state_dict = {0: 'NOSTATE', ...@@ -47,106 +47,6 @@ state_dict = {0: 'NOSTATE',
# return cls._instances[cls] # return cls._instances[cls]
class Connection(object):
""" Singleton class to handle connection."""
# __metaclass__ = Singleton
connection = None
@classmethod
def get(cls):
""" Return the libvirt connection."""
return cls.connection
@classmethod
def set(cls, connection):
""" Set the libvirt connection."""
cls.connection = connection
@decorator
def req_connection(original_function, *args, **kw):
"""Connection checking decorator for libvirt.
If envrionment variable LIBVIRT_KEEPALIVE is set
it will use the connection from the celery worker.
Return the decorateed function
"""
logging.debug("Decorator running")
if Connection.get() is None:
connect()
try:
logging.debug("Decorator calling original function")
return_value = original_function(*args, **kw)
finally:
logging.debug("Finally part of decorator")
disconnect()
return return_value
else:
logging.debug("Decorator calling original \
function with active connection")
return_value = original_function(*args, **kw)
return return_value
@decorator
def wrap_libvirtError(original_function, *args, **kw):
""" Decorator to wrap libvirt error in simple Exception.
Return decorated function
"""
try:
return original_function(*args, **kw)
except libvirt.libvirtError as e:
logging.error(e.get_error_message())
e_msg = e.get_error_message()
if vm_xml_dump is not None:
e_msg += "\n"
e_msg += vm_xml_dump
new_e = Exception(e.get_error_message())
new_e.libvirtError = True
raise new_e
@wrap_libvirtError
def connect(connection_string='qemu:///system'):
""" Connect to the libvirt daemon.
String is specified in the connection_string parameter
the default is the local root.
"""
if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")):
if Connection.get() is None:
Connection.set(libvirt.open(connection_string))
logging.debug("Connection estabilished to libvirt.")
else:
logging.debug("There is already an active connection to libvirt.")
else:
Connection.set(lib_connection)
logging.debug("Using celery libvirt connection connection.")
@wrap_libvirtError
def disconnect():
""" Disconnect from the active libvirt daemon connection."""
if os.getenv('LIBVIRT_KEEPALIVE') is None:
if Connection.get() is None:
logging.debug('There is no available libvirt conection.')
else:
Connection.get().close()
logging.debug('Connection closed to libvirt.')
Connection.set(None)
else:
logging.debug('Keepalive connection should not close.')
@celery.task @celery.task
@req_connection @req_connection
@wrap_libvirtError @wrap_libvirtError
...@@ -177,6 +77,7 @@ def create(vm_desc): ...@@ -177,6 +77,7 @@ def create(vm_desc):
if vm.vm_type == "test": if vm.vm_type == "test":
vm.arch = "i686" vm.arch = "i686"
vm_xml_dump = vm.dump_xml() vm_xml_dump = vm.dump_xml()
util.vm_xml_dump = vm_xml_dump
logging.info(vm_xml_dump) logging.info(vm_xml_dump)
# Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver # Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver
if vm.vm_type == "test": if vm.vm_type == "test":
...@@ -682,3 +583,9 @@ def get_node_metrics(): ...@@ -682,3 +583,9 @@ def get_node_metrics():
result['cpu.usage'] = cpu_percent(0) result['cpu.usage'] = cpu_percent(0)
result['memory.usage'] = virtual_memory().percent result['memory.usage'] = virtual_memory().percent
return result return result
@celery.task
def refresh_secret(user, secret):
ceph.delete_secret(user)
ceph.check_secret(user, secret)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment