Commit 2d1af23c by Czémán Arnold

Rework ceph operations with new configuration method

parent f759eb24
......@@ -6,30 +6,62 @@ import libvirt
import lxml.etree as ET
from base64 import b64decode
import logging
import re
import json
from util import req_connection, wrap_libvirtError, Connection
logger = logging.getLogger(__name__)
DUMP_SIZE_LIMIT = int(os.getenv("DUMP_SIZE_LIMIT", 20 * 1024 ** 3)) # 20GB
mon_regex = re.compile(r"^\[(?P<address>.+)\]\:(?P<port>\d+).*$")
class CephConfig:
def __init__(self, user=None, config_path=None, keyring_path=None):
self.user = user
self.config_path = config_path
self.keyring_path = keyring_path
if user is None:
self.user = "admin"
if config_path is None:
self.config_path = os.getenv("CEPH_CONFIG",
"/etc/ceph/ceph.conf")
if keyring_path is None:
default_keyring = "/etc/ceph/ceph.client.%s.keyring" % self.user
self.keyring_path = os.getenv("CEPH_KEYRING", default_keyring)
def cmd_args(self):
return ["--keyring", self.keyring_path,
"--id", self.user,
"--conf", self.config_path]
class CephConnection:
def __init__(self, pool_name, ceph_config=None):
def __init__(self, pool_name, conf=None):
self.pool_name = pool_name
self.ceph_config = ceph_config
self.conf = conf
if conf is None:
self.conf = CephConfig()
self.cluster = None
self.ioctx = None
def __enter__(self):
try:
if self.ceph_config is None:
self.ceph_config = os.getenv("CEPH_CONFIG",
"/etc/ceph/ceph.conf")
self.cluster = rados.Rados(conffile=self.ceph_config)
self.cluster.connect(timeout=2)
self.cluster = rados.Rados(
conffile=self.conf.config_path,
conf=dict(keyring=self.conf.keyring_path))
timeout = os.getenv("CEPH_TIMEOUT", 2)
self.cluster.connect(timeout=timeout)
self.ioctx = self.cluster.open_ioctx(self.pool_name)
except rados.InterruptedOrTimeoutError as e:
raise Exception(e)
......@@ -46,12 +78,41 @@ def sudo(*args):
subprocess.check_output(["/bin/sudo"] + list(args))
def map_rbd(ceph_path, local_path):
def unmap_rbd(conf, local_path):
sudo("/bin/rbd", "unmap", local_path, *conf.cmd_args())
def map_rbd(conf, ceph_path, local_path):
try:
sudo("/bin/rbd", "map", ceph_path)
sudo("/bin/rbd", "map", ceph_path, *conf.cmd_args())
except:
sudo("/bin/rbd", "unmap", local_path)
sudo("/bin/rbd", "map", ceph_path)
unmap_rbd(conf, local_path)
sudo("/bin/rbd", "map", ceph_path, *conf.cmd_args())
def get_secret_key(conf, user):
return subprocess.check_output((["/bin/ceph",
"auth", "print-key", "client.%s" % user]
+ conf.cmd_args()))
def parse_endpoint(mon):
m = mon_regex.match(mon["addr"])
return (m.group("address"), m.group("port"))
def _get_endpoints(conf):
output = subprocess.check_output((["/bin/ceph",
"mon", "dump", "--format=json"]
+ conf.cmd_args()))
mon_data = json.loads(output)
mons = mon_data["mons"]
return map(parse_endpoint, mons)
def get_endpoints(user):
conf = CephConfig(user=user)
return _get_endpoints(conf)
def save(domain, poolname, diskname):
......@@ -69,13 +130,13 @@ def save(domain, poolname, diskname):
rbd_inst.remove(conn.ioctx, diskname)
rbd_inst.create(conn.ioctx, diskname, disk_size)
try:
map_rbd(ceph_path, local_path)
map_rbd(conn.conf, ceph_path, local_path)
domain.save(local_path)
except:
rbd_inst.remove(conn.ioctx, diskname)
raise
finally:
sudo("/bin/rbd", "unmap", local_path)
unmap_rbd(conn.conf, local_path)
def restore(connection, poolname, diskname):
......@@ -84,9 +145,9 @@ def restore(connection, poolname, diskname):
ceph_path = os.path.join(poolname, diskname)
local_path = os.path.join("/dev/rbd", ceph_path)
map_rbd(ceph_path, local_path)
map_rbd(connection.conf, ceph_path, local_path)
connection.restore(local_path)
sudo("/bin/rbd", "unmap", local_path)
unmap_rbd(connection.conf, local_path)
with CephConnection(poolname) as conn:
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, diskname)
......@@ -123,7 +184,10 @@ def find_secret(user):
@req_connection
@wrap_libvirtError
def create_secret(user, secretkey):
def create_secret(user):
conf = CephConfig()
secretkey = get_secret_key(conf, user)
xml = generate_secret_xml(user)
conn = Connection.get()
secret = conn.secretDefineXML(xml)
......@@ -141,9 +205,9 @@ def delete_secret(user):
logger.info("Secret with uuid: '%s' deleted", secret.UUIDString())
def check_secret(user, secretkey):
def check_secret(user):
secret = find_secret(user)
if secret is None:
secret = create_secret(user, secretkey)
secret = create_secret(user)
return secret.UUIDString()
import lxml.etree as ET
from vmcelery import native_ovs
from ceph import check_secret
from ceph import check_secret, get_endpoints
# VM Instance class
......@@ -246,7 +246,6 @@ class CephVMDisk(VMDisk):
def __init__(self,
source,
endpoints,
disk_device="disk",
driver_name="qemu",
driver_type="raw",
......@@ -254,8 +253,7 @@ class CephVMDisk(VMDisk):
target_device="vda",
target_bus="virtio",
protocol="rbd",
ceph_user=None,
secret=None):
ceph_user=None):
super(CephVMDisk, self).__init__(
source=source,
......@@ -267,12 +265,11 @@ class CephVMDisk(VMDisk):
target_device=target_device,
target_bus=target_bus)
self.endpoints = endpoints
self.protocol = protocol
self.ceph_user = ceph_user
self.secret = secret
if ceph_user is not None and secret is not None:
check_secret(ceph_user, secret)
if ceph_user is not None:
check_secret(ceph_user)
self.endpoints = get_endpoints(ceph_user)
@classmethod
def deserialize(cls, desc):
......@@ -291,7 +288,7 @@ class CephVMDisk(VMDisk):
ET.SubElement(source, "host",
attrib={"name": name, "port": unicode(port)})
if self.ceph_user is not None and self.secret is not None:
if self.ceph_user is not None:
auth = ET.SubElement(
xml_top,
"auth",
......
......@@ -586,6 +586,6 @@ def get_node_metrics():
@celery.task
def refresh_secret(user, secret):
def refresh_secret(user):
ceph.delete_secret(user)
ceph.check_secret(user, secret)
ceph.check_secret(user)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment