Commit 729c191a by Czémán Arnold

Merge branch 'ceph' into new_ceph

Conflicts:
	storagedriver.py
parents 2b6c6fe1 21fe5b9b
import rados
import os
class CephConfig:
def __init__(self, user=None, config_path=None, keyring_path=None):
self.user = user or "admin"
self.config_path = (
config_path or os.getenv("CEPH_CONFIG", "/etc/ceph/ceph.conf"))
default_keyring = "/etc/ceph/ceph.client.%s.keyring" % self.user
self.keyring_path = (
keyring_path or os.getenv("CEPH_KEYRING", default_keyring))
def cmd_args(self):
return ["--keyring", self.keyring_path,
"--id", self.user,
"--conf", self.config_path]
class CephConnection:
def __init__(self, pool_name, conf=None, **kwargs):
self.pool_name = pool_name
self.conf = conf or CephConfig(**kwargs)
self.cluster = None
self.ioctx = None
def __enter__(self):
try:
self.cluster = rados.Rados(
conffile=self.conf.config_path,
conf=dict(keyring=self.conf.keyring_path))
timeout = os.getenv("CEPH_TIMEOUT", 2)
self.cluster.connect(timeout=timeout)
self.ioctx = self.cluster.open_ioctx(self.pool_name)
except rados.InterruptedOrTimeoutError as e:
raise Exception(e)
return self
def __exit__(self, type, value, traceback):
self.ioctx.close()
self.cluster.shutdown()
...@@ -13,13 +13,18 @@ import re ...@@ -13,13 +13,18 @@ import re
import requests import requests
import rbd
from rbd import InvalidArgument, ImageNotFound
from ceph import CephConnection
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|' re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|'
r'virtual size: \w+ \((?P<size>[0-9]+) bytes\)|' r'virtual size: \w+ \((?P<size>[0-9]+) bytes\)|'
r'backing file: \S+ \(actual path: (?P<base>\S+)\))$') r'backing file: \S+ \(actual path: (?P<base>\S+)\))$')
maximum_size = float(os.getenv("DOWNLOAD_MAX_SIZE", 1024*1024*1024*10)) MAXIMUM_SIZE = float(os.getenv("DOWNLOAD_MAX_SIZE", 1024*1024*1024*10))
class AbortException(Exception): class AbortException(Exception):
...@@ -36,9 +41,9 @@ class Disk(object): ...@@ -36,9 +41,9 @@ class Disk(object):
Handle qcow2, raw and iso images. Handle qcow2, raw and iso images.
TYPES, CREATE_TYPES, SNAPSHOT_TYPES are hand managed restrictions. TYPES, CREATE_TYPES, SNAPSHOT_TYPES are hand managed restrictions.
''' '''
TYPES = ['snapshot', 'normal'] TYPES = ('snapshot', 'normal')
FORMATS = ['qcow2', 'raw', 'iso'] FORMATS = ('qcow2', 'raw', 'iso', 'rbd')
CREATE_FORMATS = ['qcow2', 'raw'] CREATE_FORMATS = ('qcow2', 'raw', 'rbd')
def __init__(self, dir, name, format, type, size, def __init__(self, dir, name, format, type, size,
base_name, actual_size=0): base_name, actual_size=0):
...@@ -72,6 +77,7 @@ class Disk(object): ...@@ -72,6 +77,7 @@ class Disk(object):
logging.info(desc) logging.info(desc)
if isinstance(desc, basestring): if isinstance(desc, basestring):
desc = json.loads(desc) desc = json.loads(desc)
del desc["data_store_type"]
return cls(**desc) return cls(**desc)
def get_desc(self): def get_desc(self):
...@@ -210,8 +216,8 @@ class Disk(object): ...@@ -210,8 +216,8 @@ class Disk(object):
# undocumented zlib feature http://stackoverflow.com/a/2424549 # undocumented zlib feature http://stackoverflow.com/a/2424549
elif ext == 'bz2': elif ext == 'bz2':
decompressor = BZ2Decompressor() decompressor = BZ2Decompressor()
clen = int(r.headers.get('content-length', maximum_size)) clen = int(r.headers.get('content-length', MAXIMUM_SIZE))
if clen > maximum_size: if clen > MAXIMUM_SIZE:
raise FileTooBig() raise FileTooBig()
percent = 0 percent = 0
try: try:
...@@ -221,7 +227,7 @@ class Disk(object): ...@@ -221,7 +227,7 @@ class Disk(object):
chunk = decompressor.decompress(chunk) chunk = decompressor.decompress(chunk)
f.write(chunk) f.write(chunk)
actsize = f.tell() actsize = f.tell()
if actsize > maximum_size: if actsize > MAXIMUM_SIZE:
raise FileTooBig() raise FileTooBig()
new_percent = min(100, round(actsize * 100.0 / clen)) new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent: if new_percent > percent:
...@@ -247,7 +253,7 @@ class Disk(object): ...@@ -247,7 +253,7 @@ class Disk(object):
except FileTooBig: except FileTooBig:
os.unlink(disk_path) os.unlink(disk_path)
raise Exception("%s file is too big. Maximum size " raise Exception("%s file is too big. Maximum size "
"is %s" % url, maximum_size) "is %s" % (url, MAXIMUM_SIZE))
except: except:
os.unlink(disk_path) os.unlink(disk_path)
logger.error("Download %s failed, %s removed.", logger.error("Download %s failed, %s removed.",
...@@ -307,6 +313,8 @@ class Disk(object): ...@@ -307,6 +313,8 @@ class Disk(object):
'-b', self.get_base(), '-b', self.get_base(),
'-f', self.format, '-f', self.format,
self.get_path()] self.get_path()]
logging.info("Snapshot image: %s (%s)" % (self.get_path(),
self.get_base()))
# Call subprocess # Call subprocess
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
...@@ -426,3 +434,294 @@ class Disk(object): ...@@ -426,3 +434,294 @@ class Disk(object):
def list(cls, dir): def list(cls, dir):
""" List all files in <dir> directory.""" """ List all files in <dir> directory."""
return [cls.get(dir, file) for file in os.listdir(dir)] return [cls.get(dir, file) for file in os.listdir(dir)]
class CephDisk(Disk):
TYPES = ('snapshot', 'normal')
def __init__(self, dir, name, format, type, size,
base_name, actual_size=0):
"""
dir: the pool name
"""
super(CephDisk, self).__init__(dir, name, format, type, size,
base_name, actual_size)
self.dir = dir
@property
def checksum(self, blocksize=65536):
hash = md5()
with CephConnection(str(self.dir)) as conn:
with rbd.Image(conn.ioctx, self.name) as image:
size = image.size()
offset = 0
while offset + blocksize <= size:
block = image.read(offset, blocksize)
hash.update(block)
offset += blocksize
block = image.read(offset, size - offset)
hash.update(block)
return hash.hexdigest()
@classmethod
def deserialize(cls, desc):
"""Create cls object from JSON."""
logging.info(desc)
if isinstance(desc, basestring):
desc = json.loads(desc)
del desc["data_store_type"]
return cls(**desc)
def get_path(self):
return "rbd:%s/%s" % (self.dir, self.name)
def get_base(self):
return "rbd:%s/%s" % (self.dir, self.base_name)
def __create(self, ioctx):
if self.format != "rbd":
raise Exception('Invalid format: %s' % self.format)
if self.type != 'normal':
raise Exception('Invalid type: %s' % self.format)
try:
rbd_inst = rbd.RBD()
logging.info("Create ceph block: %s (%s)" % (self.get_path(),
str(self.size)))
# NOTE: http://docs.ceph.com/docs/master/rbd/rbd-snapshot/#layering
rbd_inst.create(ioctx, self.name, self.size, old_format=False,
features=rbd.RBD_FEATURE_LAYERING)
except rbd.ImageExists:
raise Exception('Ceph image already exists: %s' % self.get_path())
def create(self):
self.__with_ceph_connection(self.__create)
def check_valid_image(self):
"""Check wether the downloaded image is valid.
Set the proper type for valid images."""
format_map = [
("iso", "iso"),
("x86 boot sector", "iso")
]
buff = None
with CephConnection(str(self.dir)) as conn:
with rbd.Image(conn.ioctx, self.name) as image:
# 2k may enough determine the file type
buff = image.read(0, 2048)
with magic.Magic() as m:
ftype = m.id_buffer(buff)
logger.debug("Downloaded file type is: %s", ftype)
for file_type, disk_format in format_map:
if file_type in ftype.lower():
self.format = disk_format
return True
return False
def download(self, task, url, parent_id=None):
"""Download image from url."""
# TODO: zip support
disk_path = self.get_path()
logger.info("Downloading image from %s to %s", url, disk_path)
r = requests.get(url, stream=True)
if r.status_code != 200:
raise Exception("Invalid response status code: %s at %s" %
(r.status_code, url))
if task.is_aborted():
raise AbortException()
if parent_id is None:
parent_id = task.request.id
chunk_size = 256 * 1024
ext = url.split('.')[-1].lower()
if ext == 'gz':
decompressor = decompressobj(16 + MAX_WBITS)
# undocumented zlib feature http://stackoverflow.com/a/2424549
elif ext == 'bz2':
decompressor = BZ2Decompressor()
if ext == 'zip':
raise Exception("The zip format not supported "
"with Ceph Block Device")
clen = int(r.headers.get('content-length', MAXIMUM_SIZE))
if clen > MAXIMUM_SIZE:
raise FileTooBig()
percent = 0
try:
with CephConnection(self.dir) as conn:
rbd_inst = rbd.RBD()
# keep calm, Ceph Block Device uses thin-provisioning
rbd_inst.create(conn.ioctx, self.name, int(MAXIMUM_SIZE),
old_format=False,
features=rbd.RBD_FEATURE_LAYERING)
with rbd.Image(conn.ioctx, self.name) as image:
offset = 0
actsize = 0
for chunk in r.iter_content(chunk_size=chunk_size):
if ext in ('gz', 'bz'):
chunk = decompressor.decompress(chunk)
offset += image.write(chunk, offset)
actsize = offset
if actsize > MAXIMUM_SIZE:
raise FileTooBig()
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
percent = new_percent
if not task.is_aborted():
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'percent': percent})
else:
raise AbortException()
if ext == 'gz':
image.write(decompressor.flush(), offset)
image.flush()
image.resize(actsize)
self.size = CephDisk.get(conn.ioctx, self.dir, self.name).size
logger.debug("Download finished %s (%s bytes)",
self.name, self.size)
except AbortException:
self.__remove_disk()
logger.info("Download %s aborted %s removed.",
url, disk_path)
except (FileTooBig, InvalidArgument):
self.__remove_disk()
raise Exception("%s file is too big. Maximum size "
"is %s" % (url, MAXIMUM_SIZE))
except Exception as e:
self.__remove_disk()
logger.error("Error occured %s. Download %s failed, %s removed.",
e, url, disk_path)
raise
else:
if not self.check_valid_image():
self.__remove_disk()
raise Exception("Invalid file format. Only iso files "
"are allowed. Image from: %s" % url)
def __remove_disk(self):
with CephConnection(self.dir) as conn:
rbd_inst = rbd.RBD()
try:
rbd_inst.remove(conn.ioctx, self.name)
except ImageNotFound:
pass
def __snapshot(self, ioctx):
''' Creating snapshot with base image.
'''
# Check if snapshot type and rbd format match
if self.type != 'snapshot':
raise Exception('Invalid type: %s' % self.type)
if self.format != "rbd":
raise Exception('Invalid format: %s' % self.format)
try:
rbd_inst = rbd.RBD()
logging.info("Snapshot ceph block: %s (%s)" % (self.get_path(),
self.get_base()))
# NOTE: http://docs.ceph.com/docs/master/rbd/rbd-snapshot/#layering
rbd_inst.clone(ioctx, self.base_name, "snapshot",
ioctx, self.name, features=rbd.RBD_FEATURE_LAYERING)
except rbd.ImageExists:
# TODO: not enough
raise Exception('Ceph image already exists: %s' % self.get_base())
except Exception as e:
raise Exception("%s: %s" % (type(e), e))
def snapshot(self):
self.__with_ceph_connection(self.__snapshot)
def merge_disk_without_base(self, ioctx, task, new_disk, parent_id=None,
length=1024 * 1024):
with rbd.Image(ioctx, self.name) as image:
logger.debug("Merging %s into %s.",
self.get_path(),
new_disk.get_path())
image.copy(ioctx, new_disk.name)
with rbd.Image(ioctx, new_disk.name) as image:
image.create_snap("snapshot")
image.protect_snap("snapshot")
if not task.is_aborted():
task.update_state(task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': new_disk.size, 'percent': 100})
else:
logger.warning("Merging new disk %s is aborted by user.",
new_disk.get_path())
logger.warning("Aborted merge job, removing %s",
new_disk.get_path())
with rbd.Image(ioctx, new_disk.name) as image:
image.unprotect_snap("snapshot")
image.remove_snap("snapshot")
rbd_inst = rbd.RBD()
rbd_inst.remove(new_disk.name)
def __merge(self, ioctx, task, new_disk, parent_id=None):
""" Merging a new_disk from the actual disk and its base.
"""
if task.is_aborted():
raise AbortException()
self.merge_disk_without_base(ioctx, task, new_disk, parent_id)
def merge(self, task, new_disk, parent_id=None):
self.__with_ceph_connection(self.__merge, task, new_disk, parent_id)
def __delete(self, ioctx):
try:
logger.debug("Delete ceph block %s" % self.get_path())
with rbd.Image(ioctx, self.name) as image:
for snap in list(image.list_snaps()):
name = snap["name"]
image.unprotect_snap(name)
image.remove_snap(name)
rbd_inst = rbd.RBD()
rbd_inst.remove(ioctx, self.name)
except rbd.ImageNotFound:
pass
def delete(self):
self.__with_ceph_connection(self.__delete)
def __with_ceph_connection(self, fun, *args, **kwargs):
with CephConnection(self.dir) as conn:
return fun(conn.ioctx, *args, **kwargs)
@classmethod
def get(cls, ioctx, pool_name, name):
"""Create disk from Ceph block"""
with rbd.Image(ioctx, name) as image:
disk_info = image.stat()
size = disk_info["num_objs"] * disk_info["obj_size"]
actual_size = disk_info["size"]
parent = ""
type = "normal"
try:
parent_info = image.parent_info()
parent = parent_info[1]
type = "snapshot"
except rbd.ImageNotFound:
pass # has not got parent
return CephDisk(pool_name, name, "rbd", type,
size, parent, actual_size)
@classmethod
def list(cls, pool_name):
""" List all blocks in <pool_name> pool."""
with CephConnection(pool_name=pool_name) as conn:
rbd_inst = rbd.RBD()
return [cls.get(conn.ioctx, pool_name, file)
for file in rbd_inst.list(conn.ioctx)]
celery==3.1.17 celery==3.1.17
requests==2.5.3 requests==2.5.3
filemagic==1.6 filemagic==1.6
python-cephlibs==0.94.5-1
from disk import Disk from disk import Disk, CephDisk
from ceph import CephConnection
from storagecelery import celery from storagecelery import celery
import os import os
from os import path, unlink, statvfs, listdir from os import unlink, statvfs, listdir
from celery.contrib.abortable import AbortableTask from celery.contrib.abortable import AbortableTask
import logging import logging
import rbd
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
trash_directory = "trash" trash_directory = "trash"
@celery.task() @celery.task()
def list(dir): def list(data_store_type, dir):
return [d.get_desc() for d in Disk.list(dir)] cls = CephDisk if data_store_type == "ceph_block" else Disk
return [d.get_desc() for d in cls.list(dir)]
@celery.task() @celery.task()
def list_files(datastore): def list_files(data_store_type, dir):
return [l for l in listdir(datastore) if
path.isfile(path.join(datastore, l))] if data_store_type == "ceph_block":
with CephConnection(str(dir)) as conn:
rbd_inst = rbd.RBD()
return rbd_inst.list(conn.ioctx)
else:
return [l for l in listdir(dir) if
os.path.isfile(os.path.join(dir, l))]
@celery.task() @celery.task()
def create(disk_desc): def create(disk_desc):
disk = Disk.deserialize(disk_desc) cls = CephDisk if disk_desc["data_store_type"] == "ceph_block" else Disk
disk = cls.deserialize(disk_desc)
disk.create() disk.create()
...@@ -34,7 +45,8 @@ class download(AbortableTask): ...@@ -34,7 +45,8 @@ class download(AbortableTask):
disk_desc = kwargs['disk'] disk_desc = kwargs['disk']
url = kwargs['url'] url = kwargs['url']
parent_id = kwargs.get("parent_id", None) parent_id = kwargs.get("parent_id", None)
disk = Disk.deserialize(disk_desc) c = CephDisk if disk_desc["data_store_type"] == "ceph_block" else Disk
disk = c.deserialize(disk_desc)
disk.download(self, url, parent_id) disk.download(self, url, parent_id)
return {'size': disk.size, return {'size': disk.size,
'type': disk.format, 'type': disk.format,
...@@ -42,20 +54,28 @@ class download(AbortableTask): ...@@ -42,20 +54,28 @@ class download(AbortableTask):
@celery.task() @celery.task()
def delete(json_data): def delete(disk_desc):
disk = Disk.deserialize(json_data) cls = CephDisk if disk_desc["data_store_type"] == "ceph_block" else Disk
disk = cls.deserialize(disk_desc)
disk.delete() disk.delete()
@celery.task() @celery.task()
def delete_dump(disk_path): def delete_dump(data_store_type, dir, filename):
if disk_path.endswith(".dump") and path.isfile(disk_path): if data_store_type == "ceph_block":
with CephConnection(str(dir)) as conn:
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, str(filename))
else:
disk_path = dir + "/" + filename
if disk_path.endswith(".dump") and os.path.isfile(disk_path):
unlink(disk_path) unlink(disk_path)
@celery.task() @celery.task()
def snapshot(json_data): def snapshot(disk_desc):
disk = Disk.deserialize(json_data) cls = CephDisk if disk_desc["data_store_type"] == "ceph_block" else Disk
disk = cls.deserialize(disk_desc)
disk.snapshot() disk.snapshot()
...@@ -66,47 +86,92 @@ class merge(AbortableTask): ...@@ -66,47 +86,92 @@ class merge(AbortableTask):
old_json = kwargs['old_json'] old_json = kwargs['old_json']
new_json = kwargs['new_json'] new_json = kwargs['new_json']
parent_id = kwargs.get("parent_id", None) parent_id = kwargs.get("parent_id", None)
disk = Disk.deserialize(old_json) cls = CephDisk if old_json["data_store_type"] == "ceph_block" else Disk
new_disk = Disk.deserialize(new_json) disk = cls.deserialize(old_json)
new_disk = cls.deserialize(new_json)
disk.merge(self, new_disk, parent_id=parent_id) disk.merge(self, new_disk, parent_id=parent_id)
@celery.task() @celery.task()
def get(json_data): def get(disk_desc):
disk = Disk.get(dir=json_data['dir'], name=json_data['name']) disk = None
dir = disk_desc['dir']
if disk_desc["data_store_type"] == "ceph_block":
with CephConnection(dir) as conn:
disk = CephDisk.get(conn.ioctx, pool_name=dir,
name=disk_desc['name'])
else:
disk = Disk.get(dir=dir, name=disk_desc['name'])
return disk.get_desc() return disk.get_desc()
@celery.task() @celery.task()
def get_storage_stat(path): def get_storage_stat(data_store_type, path):
''' Return free disk space avaliable at path in bytes and percent.''' ''' Return free disk space avaliable at path in bytes and percent.'''
all_space = 1
free_space = 0
if data_store_type == "ceph_block":
with CephConnection(str(path)) as conn:
stat = conn.cluster.get_cluster_stats()
all_space = stat["kb"]
free_space = stat["kb_avail"]
else:
s = statvfs(path) s = statvfs(path)
all_space = s.f_bsize * s.f_blocks all_space = s.f_bsize * s.f_blocks
free_space = s.f_bavail * s.f_frsize free_space = s.f_bavail * s.f_frsize
free_space_percent = 100.0 * free_space / all_space free_space_percent = 100.0 * free_space / all_space
return {'free_space': free_space, return {'free_space': free_space,
'free_percent': free_space_percent} 'free_percent': free_space_percent}
@celery.task @celery.task
def exists(path, disk_name): def exists(data_store_type, path, disk_name):
return os.path.exists(os.path.join(path, disk_name)) ''' Recover named disk from the trash directory.
'''
if data_store_type == "ceph_block":
try:
with CephConnection(str(path)) as conn:
with rbd.Image(conn.ioctx, str(disk_name)):
pass
except rbd.ImageNotFound:
return False
else:
return True
elif os.path.exists(os.path.join(path, disk_name)):
return True
return False
@celery.task @celery.task
def make_free_space(path, deletable_disks, percent=10): def make_free_space(data_store_type, path, deletable_disks, percent=10):
''' Check for free space on datastore. ''' Check for free space on datastore.
If free space is less than the given percent If free space is less than the given percent
removes oldest files to satisfy the given requirement. removes oldest files to satisfy the given requirement.
''' '''
ds_type = data_store_type
logger.info("Free space on datastore: %s" % logger.info("Free space on datastore: %s" %
get_storage_stat(path).get('free_percent')) get_storage_stat(ds_type, path).get('free_percent'))
while get_storage_stat(path).get('free_percent') < percent: while get_storage_stat(ds_type, path).get('free_percent') < percent:
logger.debug(get_storage_stat(path)) logger.debug(get_storage_stat(ds_type, path))
try: try:
f = deletable_disks.pop(0) f = deletable_disks.pop(0)
if ds_type == "ceph_block":
with CephConnection(str(path)) as conn:
rbd_inst = rbd.RBD()
with rbd.Image(conn.ioctx, str(f)) as image:
for snapshot in image.list_snaps():
name = snapshot["name"]
image.unprotect_snap(name)
image.remove_snap(name)
rbd_inst.remove(conn.ioctx, str(f))
else:
unlink(os.path.join(path, f)) unlink(os.path.join(path, f))
logger.info('Image: %s removed.' % f) logger.info('Image: %s removed.' % f)
except IndexError: except IndexError:
raise Exception("There is not deletable disk.") logger.warning("Has no deletable disk.")
return False
return True return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment