Commit d2bf1727 by Czémán Arnold

disk, storagedriver: add ceph support for disk dowload, implement CephDisk download method

util: move CephConnection to util.py
parent f6b9c733
......@@ -13,8 +13,10 @@ import re
import requests
import rados
import rbd
from rbd import InvalidArgument, ImageNotFound
from util import CephConnection
logger = logging.getLogger(__name__)
......@@ -22,7 +24,7 @@ re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|'
r'virtual size: \w+ \((?P<size>[0-9]+) bytes\)|'
r'backing file: \S+ \(actual path: (?P<base>\S+)\))$')
maximum_size = float(os.getenv("DOWNLOAD_MAX_SIZE", 1024*1024*1024*10))
MAXIMUM_SIZE = float(os.getenv("DOWNLOAD_MAX_SIZE", 1024*1024*1024*10))
class AbortException(Exception):
......@@ -214,8 +216,8 @@ class Disk(object):
# undocumented zlib feature http://stackoverflow.com/a/2424549
elif ext == 'bz2':
decompressor = BZ2Decompressor()
clen = int(r.headers.get('content-length', maximum_size))
if clen > maximum_size:
clen = int(r.headers.get('content-length', MAXIMUM_SIZE))
if clen > MAXIMUM_SIZE:
raise FileTooBig()
percent = 0
try:
......@@ -225,7 +227,7 @@ class Disk(object):
chunk = decompressor.decompress(chunk)
f.write(chunk)
actsize = f.tell()
if actsize > maximum_size:
if actsize > MAXIMUM_SIZE:
raise FileTooBig()
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
......@@ -251,7 +253,7 @@ class Disk(object):
except FileTooBig:
os.unlink(disk_path)
raise Exception("%s file is too big. Maximum size "
"is %s" % url, maximum_size)
"is %s" % (url, MAXIMUM_SIZE))
except:
os.unlink(disk_path)
logger.error("Download %s failed, %s removed.",
......@@ -434,38 +436,8 @@ class Disk(object):
return [cls.get(dir, file) for file in os.listdir(dir)]
class CephConnection:
def __init__(self, pool_name, ceph_config=None):
self.pool_name = pool_name
self.ceph_config = ceph_config
self.cluster = None
self.ioctx = None
def __enter__(self):
try:
if self.ceph_config is None:
self.ceph_config = os.getenv("CEPH_CONFIG",
"/etc/ceph/ceph.conf")
self.cluster = rados.Rados(conffile=self.ceph_config)
self.cluster.connect()
self.ioctx = self.cluster.open_ioctx(self.pool_name)
except rados.TimedOut as e:
raise Exception(e)
return self
def __exit__(self, type, value, traceback):
self.ioctx.close()
self.cluster.shutdown()
class CephDisk(Disk):
# TODO: iso handling
TYPES = ('snapshot', 'normal')
def __init__(self, dir, name, format, type, size,
......@@ -480,7 +452,7 @@ class CephDisk(Disk):
self.dir = dir
@property
def checksum(self, blocksize):
def checksum(self, blocksize=65536):
raise NotImplementedError()
@classmethod
......@@ -515,15 +487,89 @@ class CephDisk(Disk):
raise Exception('Ceph image already exists: %s' % self.get_path())
def create(self):
self.__with_ceph_connection(self.__create)
def check_valid_image(self):
"""Check wether the downloaded image is valid.
Set the proper type for valid images."""
# TODO
return True
def download(self, task, url, parent_id=None):
# TODO
"""Download image from url."""
# TODO: zip support
disk_path = self.get_path()
logger.info("Downloading image from %s to %s", url, disk_path)
r = requests.get(url, stream=True)
if r.status_code != 200:
raise Exception("Invalid response status code: %s at %s" %
(r.status_code, url))
if task.is_aborted():
raise AbortException()
if parent_id is None:
parent_id = task.request.id
chunk_size = 256 * 1024
ext = url.split('.')[-1].lower()
if ext == 'gz':
decompressor = decompressobj(16 + MAX_WBITS)
# undocumented zlib feature http://stackoverflow.com/a/2424549
elif ext == 'bz2':
decompressor = BZ2Decompressor()
clen = int(r.headers.get('content-length', MAXIMUM_SIZE))
if clen > MAXIMUM_SIZE:
raise FileTooBig()
percent = 0
try:
with CephConnection(self.dir) as conn:
rbd_inst = rbd.RBD()
# keep calm, Ceph Block Device uses thin-provisioning
rbd_inst.create(conn.ioctx, self.name, int(MAXIMUM_SIZE))
with rbd.Image(conn.ioctx, self.name) as image:
offset = 0
for chunk in r.iter_content(chunk_size=chunk_size):
if ext in ('gz', 'bz'):
chunk = decompressor.decompress(chunk)
offset += image.write(chunk, offset)
actsize = offset
if actsize > MAXIMUM_SIZE:
raise FileTooBig()
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
percent = new_percent
if not task.is_aborted():
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'percent': percent})
else:
raise AbortException()
if ext == 'gz':
image.write(decompressor.flush(), offset)
image.flush()
self.size = CephDisk.get(conn.ioctx, self.dir, self.name).size
logger.debug("Download finished %s (%s bytes)",
self.name, self.size)
except AbortException:
self.__remove_disk()
logger.info("Download %s aborted %s removed.",
url, disk_path)
except (FileTooBig, InvalidArgument):
self.__remove_disk()
raise Exception("%s file is too big. Maximum size "
"is %s" % (url, MAXIMUM_SIZE))
except Exception as e:
self.__remove_disk()
logger.error("Error occured %s. Download %s failed, %s removed.",
e, url, disk_path)
raise
def __remove_disk(self):
with CephConnection(self.dir) as conn:
rbd_inst = rbd.RBD()
try:
rbd_inst.remove(conn.ioctx, self.name)
except ImageNotFound:
pass
def __snapshot(self, ioctx):
......
from disk import Disk, CephDisk, CephConnection
from disk import Disk, CephDisk
from util import CephConnection
from storagecelery import celery
import os
from os import unlink, statvfs, listdir
......@@ -51,11 +52,16 @@ class download(AbortableTask):
disk_desc = kwargs['disk']
url = kwargs['url']
parent_id = kwargs.get("parent_id", None)
disk = None
if disk_desc["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(disk_desc)
else:
disk = Disk.deserialize(disk_desc)
disk.download(self, url, parent_id)
return {'size': disk.size,
'type': disk.format,
'checksum': disk.checksum, }
'checksum': 0, } # TODO: disk.checksum
@celery.task()
......
import rados
import os
class CephConnection:
def __init__(self, pool_name, ceph_config=None):
self.pool_name = pool_name
self.ceph_config = ceph_config
self.cluster = None
self.ioctx = None
def __enter__(self):
try:
if self.ceph_config is None:
self.ceph_config = os.getenv("CEPH_CONFIG",
"/etc/ceph/ceph.conf")
self.cluster = rados.Rados(conffile=self.ceph_config)
self.cluster.connect(timeout=2)
self.ioctx = self.cluster.open_ioctx(self.pool_name)
except rados.InterruptedOrTimeoutError as e:
raise Exception(e)
return self
def __exit__(self, type, value, traceback):
self.ioctx.close()
self.cluster.shutdown()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment