Commit 0377dff6 by Bach Dániel

Merge branch 'feature-abort-save_as' into 'master'

Feature Abort Save As

Make save_as job abortable.

qcow2-snapshot image handled by Process.Popen (qemu-img)
qcow2-norm handled by Threded shutil copy
iso-s simlink is not abortable (fast operation)
parents c4667cc9 dbfb53e6
...@@ -4,10 +4,11 @@ import subprocess ...@@ -4,10 +4,11 @@ import subprocess
import re import re
import logging import logging
import magic import magic
from shutil import move, copy, copyfileobj from shutil import move, copyfileobj
from zipfile import ZipFile, is_zipfile from zipfile import ZipFile, is_zipfile
from zlib import decompressobj, MAX_WBITS from zlib import decompressobj, MAX_WBITS
from bz2 import BZ2Decompressor from bz2 import BZ2Decompressor
from time import sleep
import requests import requests
...@@ -18,7 +19,12 @@ re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|' ...@@ -18,7 +19,12 @@ re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|'
r'backing file: \S+ \(actual path: (?P<base>\S+)\))$') r'backing file: \S+ \(actual path: (?P<base>\S+)\))$')
class AbortException(Exception):
pass
class Disk(object): class Disk(object):
''' Storage driver DISK object. ''' Storage driver DISK object.
Handle qcow2, raw and iso images. Handle qcow2, raw and iso images.
TYPES, CREATE_TYPES, SNAPSHOT_TYPES are hand managed restrictions. TYPES, CREATE_TYPES, SNAPSHOT_TYPES are hand managed restrictions.
...@@ -139,10 +145,9 @@ class Disk(object): ...@@ -139,10 +145,9 @@ class Disk(object):
logger.info("Downloading image from %s to %s", url, disk_path) logger.info("Downloading image from %s to %s", url, disk_path)
r = requests.get(url, stream=True) r = requests.get(url, stream=True)
if r.status_code != 200: if r.status_code != 200:
raise Exception("Invalid response status code: %s" % r.status_code) raise Exception("Invalid response status code: %s at %s" %
(r.status_code, url))
class AbortException(Exception):
pass
if task.is_aborted(): if task.is_aborted():
raise AbortException() raise AbortException()
if parent_id is None: if parent_id is None:
...@@ -200,7 +205,7 @@ class Disk(object): ...@@ -200,7 +205,7 @@ class Disk(object):
if not self.check_valid_image(): if not self.check_valid_image():
os.unlink(disk_path) os.unlink(disk_path)
raise Exception("Invalid file format. Only qcow and " raise Exception("Invalid file format. Only qcow and "
"iso files are allowed.") "iso files are allowed. Image from: %s" % url)
def extract_iso_from_zip(self, disk_path): def extract_iso_from_zip(self, disk_path):
with ZipFile(disk_path, 'r') as z: with ZipFile(disk_path, 'r') as z:
...@@ -246,24 +251,66 @@ class Disk(object): ...@@ -246,24 +251,66 @@ class Disk(object):
# Call subprocess # Call subprocess
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
def merge(self, new_disk): def merge_disk_with_base(self, task, new_disk):
try:
cmdline = [
'qemu-img', 'convert', self.get_path(),
'-O', new_disk.format, new_disk.get_path()]
# Call subprocess
logger.debug(
"Merging %s into %s.", self.get_path(),
new_disk.get_path())
proc = subprocess.Popen(cmdline)
while True:
if proc.poll() is not None:
break
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
sleep(1)
except AbortException:
proc.terminate()
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
def merge_disk_without_base(self, task, new_disk, length=1024 * 1024):
try:
fsrc = open(self.get_path(), 'rb')
fdst = open(new_disk.get_path(), 'wb')
with fsrc, fdst:
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
if task.is_aborted():
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
except AbortException:
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
def merge(self, task, new_disk):
""" Merging a new_disk from the actual disk and its base. """ Merging a new_disk from the actual disk and its base.
""" """
if task.is_aborted():
raise AbortException()
# Check if file already exists # Check if file already exists
if os.path.isfile(new_disk.get_path()): if os.path.isfile(new_disk.get_path()):
raise Exception('File already exists: %s' % self.get_path()) raise Exception('File already exists: %s' % self.get_path())
if self.format == "iso": if self.format == "iso":
os.symlink(self.get_path(), new_disk.get_path()) os.symlink(self.get_path(), new_disk.get_path())
elif self.base_name: elif self.base_name:
cmdline = ['qemu-img', self.merge_disk_with_base(task, new_disk)
'convert',
self.get_path(),
'-O', new_disk.format,
new_disk.get_path()]
# Call subprocess
subprocess.check_output(cmdline)
else: else:
copy(self.get_path(), new_disk.get_path()) self.merge_disk_without_base(task, new_disk)
def delete(self): def delete(self):
""" Delete file. """ """ Delete file. """
......
...@@ -58,11 +58,15 @@ def snapshot(json_data): ...@@ -58,11 +58,15 @@ def snapshot(json_data):
disk.snapshot() disk.snapshot()
@celery.task() class merge(AbortableTask):
def merge(old_json, new_json): time_limit = 18000
disk = Disk.deserialize(old_json)
new_disk = Disk.deserialize(new_json) def run(self, **kwargs):
disk.merge(new_disk) old_json = kwargs['old_json']
new_json = kwargs['new_json']
disk = Disk.deserialize(old_json)
new_disk = Disk.deserialize(new_json)
disk.merge(self, new_disk)
@celery.task() @celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment