Commit f6b9c733 by Czémán Arnold

storagedriver: Reimplemenet some function for Ceph support and new garbage collector logic

disk: add timeout handling for CephConnection

Modified storagedriver functions:
* list()
* list_files()
* create()
* delete()
* snapshot()
* get()
* move_to_trash(): removed
* recover_from_trash(): renamed to is_exists, and reworked
* make_free_space(): reworked
parent 5d8b4cac
...@@ -444,12 +444,15 @@ class CephConnection: ...@@ -444,12 +444,15 @@ class CephConnection:
self.ioctx = None self.ioctx = None
def __enter__(self): def __enter__(self):
try:
if self.ceph_config is None: if self.ceph_config is None:
self.ceph_config = os.getenv("CEPH_CONFIG", "/etc/ceph/ceph.conf") self.ceph_config = os.getenv("CEPH_CONFIG",
self.cluster = rados.Rados(conffile=self.ceph_config) "/etc/ceph/ceph.conf")
self.cluster.connect() self.cluster = rados.Rados(conffile=self.ceph_config)
self.ioctx = self.cluster.open_ioctx(self.pool_name) self.cluster.connect()
self.ioctx = self.cluster.open_ioctx(self.pool_name)
except rados.TimedOut as e:
raise Exception(e)
return self return self
......
from disk import Disk, CephDisk, CephConnection from disk import Disk, CephDisk, CephConnection
from storagecelery import celery from storagecelery import celery
from os import path, unlink, statvfs, listdir, mkdir import os
from shutil import move from os import unlink, statvfs, listdir
from celery.contrib.abortable import AbortableTask from celery.contrib.abortable import AbortableTask
import logging import logging
import rbd
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
trash_directory = "trash" trash_directory = "trash"
@celery.task() @celery.task()
def list(dir): def list(data_store_type, dir):
if data_store_type == "ceph_block":
return [d.get_desc() for d in CephDisk.list(dir)]
return [d.get_desc() for d in Disk.list(dir)] return [d.get_desc() for d in Disk.list(dir)]
@celery.task() @celery.task()
def list_files(datastore): def list_files(data_store_type, dir):
return [l for l in listdir(datastore) if
path.isfile(path.join(datastore, l))] if data_store_type == "ceph_block":
with CephConnection(str(dir)) as conn:
rbd_inst = rbd.RBD()
return rbd_inst.list(conn.ioctx)
else:
return [l for l in listdir(dir) if
os.path.isfile(os.path.join(dir, l))]
@celery.task() @celery.task()
...@@ -27,7 +39,7 @@ def create(disk_desc): ...@@ -27,7 +39,7 @@ def create(disk_desc):
if disk_desc["data_store_type"] == "ceph_block": if disk_desc["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(disk_desc) disk = CephDisk.deserialize(disk_desc)
else: else:
disk = Disk.desrialize(disk_desc) disk = Disk.deserialize(disk_desc)
disk.create() disk.create()
...@@ -47,29 +59,30 @@ class download(AbortableTask): ...@@ -47,29 +59,30 @@ class download(AbortableTask):
@celery.task() @celery.task()
def delete(json_data): def delete(disk_desc):
disk = None disk = None
if json_data["data_store_type"] == "ceph_block": if disk_desc["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(json_data) disk = CephDisk.deserialize(disk_desc)
else: else:
disk = Disk.deserialize(json_data) disk = Disk.deserialize(disk_desc)
disk.delete() disk.delete()
@celery.task() @celery.task()
def delete_dump(disk_path): def delete_dump(data_store_type, disk_path):
if disk_path.endswith(".dump") and path.isfile(disk_path):
if disk_path.endswith(".dump") and os.path.isfile(disk_path):
unlink(disk_path) unlink(disk_path)
@celery.task() @celery.task()
def snapshot(json_data): def snapshot(disk_desc):
disk = None disk = None
if json_data["data_store_type"] == "ceph_block": if disk_desc["data_store_type"] == "ceph_block":
disk = CephDisk.deserialize(json_data) disk = CephDisk.deserialize(disk_desc)
else: else:
disk = Disk.deserialize(json_data) disk = Disk.deserialize(disk_desc)
disk.snapshot() disk.snapshot()
...@@ -95,15 +108,16 @@ class merge(AbortableTask): ...@@ -95,15 +108,16 @@ class merge(AbortableTask):
@celery.task() @celery.task()
def get(json_data): def get(disk_desc):
disk = None disk = None
dir = json_data['dir'] dir = disk_desc['dir']
if json_data["data_store_type"] == "ceph_block":
if disk_desc["data_store_type"] == "ceph_block":
with CephConnection(dir) as conn: with CephConnection(dir) as conn:
disk = CephDisk.get(conn.ioctx, pool_name=dir, disk = CephDisk.get(conn.ioctx, pool_name=dir,
name=json_data['name']) name=disk_desc['name'])
else: else:
disk = Disk.get(dir=dir, name=json_data['name']) disk = Disk.get(dir=dir, name=disk_desc['name'])
return disk.get_desc() return disk.get_desc()
...@@ -129,46 +143,43 @@ def get_storage_stat(data_store_type, path): ...@@ -129,46 +143,43 @@ def get_storage_stat(data_store_type, path):
@celery.task @celery.task
def move_to_trash(datastore, disk_name): def is_exists(data_store_type, path, disk_name):
''' Move path to the trash directory.
'''
trash_path = path.join(datastore, trash_directory)
disk_path = path.join(datastore, disk_name)
if not path.isdir(trash_path):
mkdir(trash_path)
# TODO: trash dir configurable?
move(disk_path, trash_path)
@celery.task
def recover_from_trash(datastore, disk_name):
''' Recover named disk from the trash directory. ''' Recover named disk from the trash directory.
''' '''
if path.exists(path.join(datastore, disk_name)): if data_store_type == "ceph_block":
return False try:
disk_path = path.join(datastore, trash_directory, disk_name) with CephConnection(str(path)) as conn:
# TODO: trash dir configurable? with rbd.Image(conn.ioctx, disk_name):
move(disk_path, datastore) pass
return True except rbd.ImageNotFound:
return False
else:
return True
elif os.path.exists(os.path.join(path, disk_name)):
return True
return False
@celery.task @celery.task
def make_free_space(datastore, percent=10): def make_free_space(data_store_type, path, deletable_disks, percent=10):
''' Check for free space on datastore. ''' Check for free space on datastore.
If free space is less than the given percent If free space is less than the given percent
removes oldest files to satisfy the given requirement. removes oldest files to satisfy the given requirement.
''' '''
trash_path = path.join(datastore, trash_directory)
files = sorted(listdir(trash_path),
key=lambda x: path.getctime(path.join(trash_path, x)))
logger.info("Free space on datastore: %s" % logger.info("Free space on datastore: %s" %
get_storage_stat(trash_path).get('free_percent')) get_storage_stat(path).get('free_percent'))
while get_storage_stat(trash_path).get('free_percent') < percent: while get_storage_stat(path).get('free_percent') < percent:
logger.debug(get_storage_stat(trash_path)) logger.debug(get_storage_stat(path))
try: try:
f = files.pop(0) f = deletable_disks.pop(0)
unlink(path.join(trash_path, f)) if data_store_type == "ceph_block":
with CephConnection(str(path)) as conn:
rbd_inst = rbd.RBD()
rbd_inst.remove(conn.ioctx, f)
else:
unlink(os.path.join(path, f))
logger.info('Image: %s removed.' % f) logger.info('Image: %s removed.' % f)
except IndexError: except IndexError:
raise Exception("Trash folder is empty.") raise Exception("Has no deletable disk.")
return True return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment