Commit 2b4ce60c by Szeberényi Imre

Merge branch 'export_progress' into 'master'

Export and import disk progress

See merge request !19
parents 8f4918e4 7af8d98d
import shutil
import json import json
import os import os
import subprocess import subprocess
import logging import logging
...@@ -13,7 +12,7 @@ from time import sleep ...@@ -13,7 +12,7 @@ from time import sleep
from hashlib import md5 from hashlib import md5
import re import re
from requests_toolbelt import MultipartEncoder from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import requests import requests
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -268,18 +267,32 @@ class Disk(object): ...@@ -268,18 +267,32 @@ class Disk(object):
raise Exception("Invalid file format. Only qcow and " raise Exception("Invalid file format. Only qcow and "
"iso files are allowed. Image from: %s" % url) "iso files are allowed. Image from: %s" % url)
def import_disk(self, url): def import_disk(self, task, url, parent_id):
r = requests.get(url, stream=True) r = requests.get(url, stream=True)
clen = int(r.headers.get('content-length'))
downloaded_file = os.path.join(self.dir, url.split('/')[-1]) downloaded_file = os.path.join(self.dir, url.split('/')[-1])
percent = 0
try:
with open(downloaded_file, 'wb') as f: with open(downloaded_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): for chunk in r.iter_content(chunk_size=256 * 1024):
f.write(chunk) f.write(chunk)
current_size = f.tell()
new_percent = current_size * 100 / clen
if task.is_aborted():
raise AbortException()
if new_percent > percent:
percent = new_percent
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'percent': percent}
)
with magic.Magic() as m: with magic.Magic() as m:
ftype = m.id_filename(downloaded_file) ftype = m.id_filename(downloaded_file)
if 'qcow' in ftype.lower(): if 'qcow' in ftype.lower():
shutil.move(downloaded_file, self.get_path()) move(downloaded_file, self.get_path())
else: else:
cmdline = ['qemu-img', cmdline = ['qemu-img',
'convert', 'convert',
...@@ -287,7 +300,17 @@ class Disk(object): ...@@ -287,7 +300,17 @@ class Disk(object):
downloaded_file, downloaded_file,
self.get_path()] self.get_path()]
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
except AbortException:
os.unlink(downloaded_file)
if os.path.exists(self.get_path()):
os.unlink(self.get_path())
logger.info("Import of disk %s aborted" % self.name)
except:
os.unlink(downloaded_file)
if os.path.exists(self.get_path()):
os.unlink(self.get_path())
raise
else:
os.unlink(downloaded_file) os.unlink(downloaded_file)
if not self.check_valid_image(): if not self.check_valid_image():
...@@ -296,22 +319,45 @@ class Disk(object): ...@@ -296,22 +319,45 @@ class Disk(object):
self.size = Disk.get(self.dir, self.name).size self.size = Disk.get(self.dir, self.name).size
def export(self, format, exported_name, upload_link): def export(self, task, disk_format, exported_name, upload_link, parent_id):
exported_path = self.get_path() + '.' + format exported_path = self.get_path() + '.' + disk_format
cmdline = ['qemu-img', cmdline = ['qemu-img',
'convert', 'convert',
'-O', format, '-O', disk_format,
self.get_path(), self.get_path(),
exported_path] exported_path]
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
size = os.path.getsize(exported_path)
percent = [0]
def update_state(monitor):
new_percent = monitor.bytes_read * 100 / size
if task.is_aborted():
raise AbortException()
if new_percent > percent[0]:
percent[0] = new_percent
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'percent': percent[0]}
)
with open(exported_path, 'rb') as exported_disk: with open(exported_path, 'rb') as exported_disk:
try: try:
m = MultipartEncoder( e = MultipartEncoder(
{'data': (exported_name + '.' + format, {'data': (exported_name + '.' + disk_format,
exported_disk)} exported_disk,
'application/octet-stream')}
) )
m = MultipartEncoderMonitor(e, update_state)
# Force the read function to read more than 8192 bytes,
# which is a hardcoded value in httplib. This increases
# the upload speed. See:
# https://github.com/requests/toolbelt/issues/75
m._read = m.read
m.read = lambda _: m._read(1024 * 1024)
response = requests.post( response = requests.post(
upload_link, upload_link,
data=m, data=m,
......
...@@ -41,17 +41,32 @@ class download(AbortableTask): ...@@ -41,17 +41,32 @@ class download(AbortableTask):
'checksum': disk.checksum, } 'checksum': disk.checksum, }
@celery.task() class import_disk(AbortableTask):
def import_disk(disk_desc, url): time_limit = 18000
def run(self, **kwargs):
disk_desc = kwargs["disk_desc"]
url = kwargs["url"]
parent_id = kwargs["task"]
disk = Disk.deserialize(disk_desc) disk = Disk.deserialize(disk_desc)
disk.import_disk(url) disk.import_disk(self, url, parent_id)
return disk.size return {
"size": disk.size,
"checksum": disk.checksum
}
@celery.task() class export_disk(AbortableTask):
def export(disk_desc, format, exported_name, upload_link): time_limit = 18000
def run(self, **kwargs):
disk_desc = kwargs["disk_desc"]
disk_format = kwargs["disk_format"]
exported_name = kwargs["exported_name"]
upload_link = kwargs["upload_link"]
parent_id = kwargs["task"]
disk = Disk.deserialize(disk_desc) disk = Disk.deserialize(disk_desc)
disk.export(format, exported_name, upload_link) disk.export(self, disk_format, exported_name, upload_link, parent_id)
@celery.task() @celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment