Commit e941545f by Guba Sándor

add status info to merge

parent 12a7d752
......@@ -60,6 +60,7 @@ class Disk(object):
'name': self.name,
'dir': self.dir,
'format': self.format,
'type': self.type,
'size': self.size,
'actual_size': self.actual_size,
'base_name': self.base_name,
......@@ -88,7 +89,13 @@ class Disk(object):
format = disk_info.get('format')
size = disk_info.get('virtual-size')
actual_size = disk_info.get('actual-size')
base_name = disk_info.get('backing-filename')
# Check if disk has base (backing-image)
base_path = disk_info.get('backing-filename')
if base_path:
base_name = os.path.basename(base_path)
else:
base_name = None
# Based on backing image determine weather snapshot ot normal image
if base_name:
type = 'snapshot'
else:
......@@ -246,7 +253,7 @@ class Disk(object):
# Call subprocess
subprocess.check_output(cmdline)
def merge_disk_with_base(self, task, new_disk):
def merge_disk_with_base(self, task, new_disk, parent_id=None):
try:
cmdline = [
'qemu-img', 'convert', self.get_path(),
......@@ -255,41 +262,86 @@ class Disk(object):
logger.debug(
"Merging %s into %s.", self.get_path(),
new_disk.get_path())
percent = 0
diff_disk = Disk.get(self.dir, self.name)
base_disk = Disk.get(self.dir, self.base_name)
clen = max(base_disk.actual_size + diff_disk.actual_size,
diff_disk.size)
output = new_disk.get_path()
proc = subprocess.Popen(cmdline)
while True:
if proc.poll() is not None:
break
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
try:
actsize = os.path.getsize(output)
except OSError:
actsize = 0
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
percent = new_percent
if not task.is_aborted():
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'percent': percent})
else:
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
sleep(1)
except AbortException:
proc.terminate()
logger.warning("Aborted remove %s", new_disk.get_path())
logger.warning("Aborted merge job removing %s",
new_disk.get_path())
os.unlink(new_disk.get_path())
except:
if proc:
proc.terminate()
logger.exception("Unknown error occured removing %s ",
new_disk.get_path())
os.unlink(new_disk.get_path())
raise
def merge_disk_without_base(self, task, new_disk, length=1024 * 1024):
def merge_disk_without_base(self, task, new_disk, parent_id=None,
length=1024*1024):
try:
fsrc = open(self.get_path(), 'rb')
fdst = open(new_disk.get_path(), 'wb')
clen = self.size
actsize = 0
percent = 0
with fsrc, fdst:
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
actsize += len(buf)
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
percent = new_percent
if not task.is_aborted():
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'percent': percent})
else:
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
except AbortException:
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
except:
logger.exception("Unknown error occured removing %s ",
new_disk.get_path())
os.unlink(new_disk.get_path())
raise
def merge(self, task, new_disk):
def merge(self, task, new_disk, parent_id=None):
""" Merging a new_disk from the actual disk and its base.
"""
......@@ -303,9 +355,9 @@ class Disk(object):
if self.format == "iso":
os.symlink(self.get_path(), new_disk.get_path())
elif self.base_name:
self.merge_disk_with_base(task, new_disk)
self.merge_disk_with_base(task, new_disk, parent_id)
else:
self.merge_disk_without_base(task, new_disk)
self.merge_disk_without_base(task, new_disk, parent_id)
def delete(self):
""" Delete file. """
......
......@@ -64,9 +64,10 @@ class merge(AbortableTask):
def run(self, **kwargs):
old_json = kwargs['old_json']
new_json = kwargs['new_json']
parent_id = kwargs.get("parent_id", None)
disk = Disk.deserialize(old_json)
new_disk = Disk.deserialize(new_json)
disk.merge(self, new_disk)
disk.merge(self, new_disk, parent_id=parent_id)
@celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment