Commit dbfb53e6 by Bach Dániel

rewrite merge task

parent 5b820fc0
...@@ -4,10 +4,11 @@ import subprocess ...@@ -4,10 +4,11 @@ import subprocess
import re import re
import logging import logging
import magic import magic
from shutil import move, copy, copyfileobj from shutil import move, copyfileobj
from zipfile import ZipFile, is_zipfile from zipfile import ZipFile, is_zipfile
from zlib import decompressobj, MAX_WBITS from zlib import decompressobj, MAX_WBITS
from bz2 import BZ2Decompressor from bz2 import BZ2Decompressor
from time import sleep
import requests import requests
...@@ -18,6 +19,10 @@ re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|' ...@@ -18,6 +19,10 @@ re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|'
r'backing file: \S+ \(actual path: (?P<base>\S+)\))$') r'backing file: \S+ \(actual path: (?P<base>\S+)\))$')
class AbortException(Exception):
pass
class Disk(object): class Disk(object):
''' Storage driver DISK object. ''' Storage driver DISK object.
...@@ -143,8 +148,6 @@ class Disk(object): ...@@ -143,8 +148,6 @@ class Disk(object):
raise Exception("Invalid response status code: %s at %s" % raise Exception("Invalid response status code: %s at %s" %
(r.status_code, url)) (r.status_code, url))
class AbortException(Exception):
pass
if task.is_aborted(): if task.is_aborted():
raise AbortException() raise AbortException()
if parent_id is None: if parent_id is None:
...@@ -248,65 +251,66 @@ class Disk(object): ...@@ -248,65 +251,66 @@ class Disk(object):
# Call subprocess # Call subprocess
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
def merge_disk_with_base(self, task, new_disk):
try:
cmdline = [
'qemu-img', 'convert', self.get_path(),
'-O', new_disk.format, new_disk.get_path()]
# Call subprocess
logger.debug(
"Merging %s into %s.", self.get_path(),
new_disk.get_path())
proc = subprocess.Popen(cmdline)
while True:
if proc.poll() is not None:
break
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
sleep(1)
except AbortException:
proc.terminate()
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
def merge_disk_without_base(self, task, new_disk, length=1024 * 1024):
try:
fsrc = open(self.get_path(), 'rb')
fdst = open(new_disk.get_path(), 'wb')
with fsrc, fdst:
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
except AbortException:
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
def merge(self, task, new_disk): def merge(self, task, new_disk):
""" Merging a new_disk from the actual disk and its base. """ Merging a new_disk from the actual disk and its base.
""" """
from time import sleep
import threading
class AbortException(Exception):
pass
if task.is_aborted(): if task.is_aborted():
raise AbortException() raise AbortException()
# Check if file already exists # Check if file already exists
if os.path.isfile(new_disk.get_path()): if os.path.isfile(new_disk.get_path()):
raise Exception('File already exists: %s' % self.get_path()) raise Exception('File already exists: %s' % self.get_path())
if self.format == "iso": if self.format == "iso":
os.symlink(self.get_path(), new_disk.get_path()) os.symlink(self.get_path(), new_disk.get_path())
elif self.base_name: elif self.base_name:
try: self.merge_disk_with_base(task, new_disk)
cmdline = ['qemu-img',
'convert',
self.get_path(),
'-O', new_disk.format,
new_disk.get_path()]
# Call subprocess
logger.debug(
"Merging %s into %s.", self.get_path(),
new_disk.get_path())
proc = subprocess.Popen(cmdline)
while True:
if proc.poll() is not None:
break
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException
sleep(1)
except AbortException:
proc.terminate()
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
else: else:
try: self.merge_disk_without_base(task, new_disk)
t = threading.Thread(
target=copy,
args=(
self.get_path(),
new_disk.get_path()))
while True:
if not t.isAlive():
break
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException
except AbortException:
t._Thread__stop()
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
def delete(self): def delete(self):
""" Delete file. """ """ Delete file. """
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment