Commit 1ddad930 by Czémán Arnold

disk: add CephDisk which overrides some Disk method in Ceph specific way; small fixes in Disk

parent 7de1a3f5
......@@ -13,6 +13,9 @@ import re
import requests
import rados
import rbd
logger = logging.getLogger(__name__)
re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|'
......@@ -36,9 +39,9 @@ class Disk(object):
Handle qcow2, raw and iso images.
TYPES, CREATE_TYPES, SNAPSHOT_TYPES are hand managed restrictions.
'''
TYPES = ['snapshot', 'normal']
FORMATS = ['qcow2', 'raw', 'iso']
CREATE_FORMATS = ['qcow2', 'raw']
TYPES = ('snapshot', 'normal')
FORMATS = ('qcow2', 'raw', 'iso', 'rbd')
CREATE_FORMATS = ('qcow2', 'raw', 'rbd')
def __init__(self, dir, name, format, type, size,
base_name, actual_size=0):
......@@ -72,6 +75,7 @@ class Disk(object):
logging.info(desc)
if isinstance(desc, basestring):
desc = json.loads(desc)
del desc["data_store_type"]
return cls(**desc)
def get_desc(self):
......@@ -307,6 +311,8 @@ class Disk(object):
'-b', self.get_base(),
'-f', self.format,
self.get_path()]
logging.info("Snapshot image: %s (%s)" % (self.get_path(),
self.get_base()))
# Call subprocess
subprocess.check_output(cmdline)
......@@ -426,3 +432,231 @@ class Disk(object):
def list(cls, dir):
""" List all files in <dir> directory."""
return [cls.get(dir, file) for file in os.listdir(dir)]
class CephConnection:
def __init__(self, pool_name, ceph_config=None):
self.pool_name = pool_name
self.ceph_config = ceph_config
self.cluster = None
self.ioctx = None
def __enter__(self):
if self.ceph_config is None:
self.ceph_config = os.getenv("CEPH_CONFIG", "/etc/ceph/ceph.conf")
self.cluster = rados.Rados(conffile=self.ceph_config)
self.cluster.connect()
self.ioctx = self.cluster.open_ioctx(self.pool_name)
return self
def __exit__(self, type, value, traceback):
self.ioctx.close()
self.cluster.shutdown()
class CephDisk(Disk):
# TODO: iso handling
TYPES = ('snapshot', 'normal')
def __init__(self, dir, name, format, type, size,
base_name, actual_size=0):
"""
dir: the pool name
"""
super(CephDisk, self).__init__(dir, name, format, type, size,
base_name, actual_size)
self.dir = dir
@property
def checksum(self, blocksize):
raise NotImplementedError()
@classmethod
def deserialize(cls, desc):
"""Create cls object from JSON."""
logging.info(desc)
if isinstance(desc, basestring):
desc = json.loads(desc)
del desc["data_store_type"]
return cls(**desc)
def get_path(self):
return "rbd:%s/%s" % (self.dir, self.name)
def get_base(self):
return "rbd:%s/%s" % (self.dir, self.base_name)
def __create(self, ioctx):
if self.format != "rbd":
raise Exception('Invalid format: %s' % self.format)
if self.type != 'normal':
raise Exception('Invalid type: %s' % self.format)
try:
rbd_inst = rbd.RBD()
logging.info("Create ceph block: %s (%s)" % (self.get_path(),
str(self.size)))
rbd_inst.create(ioctx, self.name, self.size, old_format=False,
features=rbd.RBD_FEATURE_LAYERING)
except rbd.ImageExists:
raise Exception('Ceph image already exists: %s' % self.get_path())
def create(self):
self.__with_ceph_connection(self.__create)
def check_valid_image(self):
# TODO
return True
def download(self, task, url, parent_id=None):
# TODO
pass
def __snapshot(self, ioctx):
''' Creating snapshot with base image.
'''
# Check if snapshot type and rbd format matchmatch
if self.type != 'snapshot':
raise Exception('Invalid type: %s' % self.type)
if self.format != "rbd":
raise Exception('Invalid format: %s' % self.format)
try:
rbd_inst = rbd.RBD()
logging.info("Snapshot ceph block: %s (%s)" % (self.get_path(),
self.get_base()))
rbd_inst.clone(ioctx, self.base_name, "snapshot",
ioctx, self.name, features=rbd.RBD_FEATURE_LAYERING)
except rbd.ImageExists:
# TODO: not enough
raise Exception('Ceph image already exists: %s' % self.get_base())
except Exception as e:
raise Exception("%s: %s" % (type(e), e))
def snapshot(self):
self.__with_ceph_connection(self.__snapshot)
def merge_disk_with_base(self, ioctx, task, new_disk, parent_id=None):
with rbd.Image(ioctx, self.name) as image:
logger.debug("Merging %s into %s.",
self.get_path(),
new_disk.get_path())
image.create_snap(new_disk.name)
image.protect_snap(new_disk.name)
if not task.is_aborted():
task.update_state(task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': new_disk.size, 'percent': 100})
else:
logger.warning("Merging new disk %s is aborted by user.",
new_disk.get_path())
logger.warning("Aborted merge job, removing %s",
new_disk.get_path())
with rbd.Image(ioctx, self.name) as image:
image.remove_snap(new_disk.name)
def merge_disk_without_base(self, ioctx, task, new_disk, parent_id=None,
length=1024 * 1024):
with rbd.Image(ioctx, self.name) as image:
logger.debug("Merging %s into %s.",
self.get_path(),
new_disk.get_path())
image.copy(ioctx, new_disk.name)
with rbd.Image(ioctx, new_disk.name) as image:
image.create_snap("snapshot")
image.protect_snap("snapshot")
if not task.is_aborted():
task.update_state(task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': new_disk.size, 'percent': 100})
else:
logger.warning("Merging new disk %s is aborted by user.",
new_disk.get_path())
logger.warning("Aborted merge job, removing %s",
new_disk.get_path())
with rbd.Image(ioctx, new_disk.name) as image:
rbd_inst = rbd.RBD()
rbd_inst.remove(new_disk.name)
def __merge(self, ioctx, task, new_disk, parent_id=None):
""" Merging a new_disk from the actual disk and its base.
"""
if task.is_aborted():
raise AbortException()
# if self.base_name:
# self.merge_disk_with_base(ioctx, task, new_disk, parent_id)
# else:
self.merge_disk_without_base(ioctx, task, new_disk, parent_id)
def merge(self, task, new_disk, parent_id=None):
self.__with_ceph_connection(self.__merge, task, new_disk, parent_id)
def __delete(self, ioctx):
try:
logger.debug("Delete ceph block %s" % self.get_path())
with rbd.Image(ioctx, self.name) as image:
for snap in list(image.list_snaps()):
name = snap["name"]
image.unprotect_snap(name)
image.remove_snap(name)
rbd_inst = rbd.RBD()
rbd_inst.remove(ioctx, self.name)
except rbd.ImageNotFound:
pass
def delete(self):
self.__with_ceph_connection(self.__delete)
def __with_ceph_connection(self, fun, *args, **kwargs):
with CephConnection(self.dir) as conn:
return fun(conn.ioctx, *args, **kwargs)
@classmethod
def get(cls, ioctx, pool_name, name):
"""Create disk from Ceph block"""
with rbd.Image(ioctx, name) as image:
disk_info = image.stat()
size = disk_info["num_objs"] * disk_info["obj_size"]
actual_size = disk_info["size"]
parent = ""
type = "normal"
try:
parent_info = image.parent_info()
parent = parent_info[1]
type = "snapshot"
except rbd.ImageNotFound:
pass # has not got parent
return CephDisk(pool_name, name, "rbd", type,
size, parent, actual_size)
@classmethod
def list(cls, pool_name):
""" List all blocks in <pool_name> pool."""
with CephConnection(pool_name=pool_name) as conn:
rbd_inst = rbd.RBD()
return [cls.get(conn.ioctx, pool_name, file)
for file in rbd_inst.list(conn.ioctx)]
from disk import Disk
from disk import Disk, CephDisk, CephConnection
from storagecelery import celery
from os import path, unlink, statvfs, listdir, mkdir
from shutil import move
......@@ -23,7 +23,12 @@ def list_files(datastore):
@celery.task()
def create(disk_desc):
disk = Disk.deserialize(disk_desc)
disk = None
if disk_desc["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(disk_desc)
else:
disk = Disk.desrialize(disk_desc)
disk.create()
......@@ -43,7 +48,12 @@ class download(AbortableTask):
@celery.task()
def delete(json_data):
disk = Disk.deserialize(json_data)
disk = None
if json_data["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(json_data)
else:
disk = Disk.deserialize(json_data)
disk.delete()
......@@ -55,7 +65,12 @@ def delete_dump(disk_path):
@celery.task()
def snapshot(json_data):
disk = Disk.deserialize(json_data)
disk = None
if json_data["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(json_data)
else:
disk = Disk.deserialize(json_data)
disk.snapshot()
......@@ -66,14 +81,30 @@ class merge(AbortableTask):
old_json = kwargs['old_json']
new_json = kwargs['new_json']
parent_id = kwargs.get("parent_id", None)
disk = Disk.deserialize(old_json)
new_disk = Disk.deserialize(new_json)
disk = None
new_disk = None
if old_json["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(old_json)
new_disk = CephDisk.deserialize(new_json)
else:
disk = Disk.deserialize(old_json)
new_disk = Disk.deserialize(new_json)
disk.merge(self, new_disk, parent_id=parent_id)
@celery.task()
def get(json_data):
disk = Disk.get(dir=json_data['dir'], name=json_data['name'])
disk = None
dir = json_data['dir']
if json_data["data_store_type"] == "ceph_block":
with CephConnection(dir) as conn:
disk = CephDisk.get(conn.ioctx, pool_name=dir,
name=json_data['name'])
else:
disk = Disk.get(dir=dir, name=json_data['name'])
return disk.get_desc()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment