Commit 7d71f8e0 by Bach Dániel

Merge branch 'disk-download-enhancement' into 'master'

Disk Download Enhancement
parents 0282f543 f3a27e66
...@@ -3,6 +3,7 @@ import os ...@@ -3,6 +3,7 @@ import os
import subprocess import subprocess
import re import re
import logging import logging
import magic
from shutil import move, copy, copyfileobj from shutil import move, copy, copyfileobj
from zipfile import ZipFile, is_zipfile from zipfile import ZipFile, is_zipfile
from zlib import decompressobj, MAX_WBITS from zlib import decompressobj, MAX_WBITS
...@@ -116,68 +117,90 @@ class Disk(object): ...@@ -116,68 +117,90 @@ class Disk(object):
# Call subprocess # Call subprocess
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
def check_valid_image(self):
"""Check wether the downloaded image is valid.
Set the proper type for valid images."""
format_map = [
("qcow", "qcow2-norm"),
("iso", "iso")
]
with magic.Magic() as m:
ftype = m.id_filename(self.get_path())
logger.debug("Downloaded file type is: %s", ftype)
for file_type, disk_format in format_map:
if file_type in ftype.lower():
self.format = disk_format
return True
return False
def download(self, task, url, parent_id=None): # noqa def download(self, task, url, parent_id=None): # noqa
"""Download image from url.""" """Download image from url."""
disk_path = self.get_path() disk_path = self.get_path()
logger.info("Downloading image from %s to %s", url, disk_path) logger.info("Downloading image from %s to %s", url, disk_path)
r = requests.get(url, stream=True) r = requests.get(url, stream=True)
if r.status_code == 200: if r.status_code != 200:
class AbortException(Exception): raise Exception("Invalid response status code: %s" % r.status_code)
pass
if task.is_aborted(): class AbortException(Exception):
raise AbortException() pass
if parent_id is None: if task.is_aborted():
parent_id = task.request.id raise AbortException()
chunk_size = 256 * 1024 if parent_id is None:
ext = url.split('.')[-1].lower() parent_id = task.request.id
if ext == 'gz': chunk_size = 256 * 1024
decompressor = decompressobj(16 + MAX_WBITS) ext = url.split('.')[-1].lower()
# undocumented zlib feature http://stackoverflow.com/a/2424549 if ext == 'gz':
elif ext == 'bz2': decompressor = decompressobj(16 + MAX_WBITS)
decompressor = BZ2Decompressor() # undocumented zlib feature http://stackoverflow.com/a/2424549
clen = max(int(r.headers.get('content-length', 700000000)), 1) elif ext == 'bz2':
percent = 0 decompressor = BZ2Decompressor()
try: clen = max(int(r.headers.get('content-length', 700000000)), 1)
with open(disk_path, 'wb') as f: percent = 0
for chunk in r.iter_content(chunk_size=chunk_size): try:
if ext in ('gz', 'bz'): with open(disk_path, 'wb') as f:
chunk = decompressor.decompress(chunk) for chunk in r.iter_content(chunk_size=chunk_size):
f.write(chunk) if ext in ('gz', 'bz'):
actsize = f.tell() chunk = decompressor.decompress(chunk)
new_percent = min(100, round(actsize * 100.0 / clen)) f.write(chunk)
if new_percent > percent: actsize = f.tell()
percent = new_percent new_percent = min(100, round(actsize * 100.0 / clen))
if not task.is_aborted(): if new_percent > percent:
task.update_state( percent = new_percent
task_id=parent_id, if not task.is_aborted():
state=task.AsyncResult(parent_id).state, task.update_state(
meta={'size': actsize, 'percent': percent}) task_id=parent_id,
else: state=task.AsyncResult(parent_id).state,
raise AbortException() meta={'size': actsize, 'percent': percent})
if ext == 'gz': else:
f.write(decompressor.flush()) raise AbortException()
f.flush() if ext == 'gz':
self.size = os.path.getsize(disk_path) f.write(decompressor.flush())
logger.debug("Download finished %s (%s bytes)", f.flush()
self.name, self.size) self.size = os.path.getsize(disk_path)
except AbortException: logger.debug("Download finished %s (%s bytes)",
# Cleanup file: self.name, self.size)
os.unlink(disk_path) except AbortException:
logger.info("Download %s aborted %s removed.", # Cleanup file:
url, disk_path) os.unlink(disk_path)
except: logger.info("Download %s aborted %s removed.",
url, disk_path)
except:
os.unlink(disk_path)
logger.error("Download %s failed, %s removed.",
url, disk_path)
raise
else:
if ext == 'zip' and is_zipfile(disk_path):
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'extracting': 'zip',
'percent': 99})
self.extract_iso_from_zip(disk_path)
if not self.check_valid_image():
os.unlink(disk_path) os.unlink(disk_path)
logger.error("Download %s failed, %s removed.", raise Exception("Invalid file format. Only qcow and "
url, disk_path) "iso files are allowed.")
raise
else:
if ext == 'zip' and is_zipfile(disk_path):
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'extracting': 'zip',
'percent': 99})
self.extract_iso_from_zip(disk_path)
def extract_iso_from_zip(self, disk_path): def extract_iso_from_zip(self, disk_path):
with ZipFile(disk_path, 'r') as z: with ZipFile(disk_path, 'r') as z:
......
...@@ -36,7 +36,8 @@ class download(AbortableTask): ...@@ -36,7 +36,8 @@ class download(AbortableTask):
parent_id = kwargs.get("parent_id", None) parent_id = kwargs.get("parent_id", None)
disk = Disk.deserialize(disk_desc) disk = Disk.deserialize(disk_desc)
disk.download(self, url, parent_id) disk.download(self, url, parent_id)
return disk.size return {'size': disk.size,
'type': disk.format}
@celery.task() @celery.task()
...@@ -89,7 +90,7 @@ def move_to_trash(datastore, disk_name): ...@@ -89,7 +90,7 @@ def move_to_trash(datastore, disk_name):
disk_path = path.join(datastore, disk_name) disk_path = path.join(datastore, disk_name)
if not path.isdir(trash_path): if not path.isdir(trash_path):
mkdir(trash_path) mkdir(trash_path)
#TODO: trash dir configurable? # TODO: trash dir configurable?
move(disk_path, trash_path) move(disk_path, trash_path)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment