Commit 44c50b76 by Máhonfai Bálint

Implement disk export with plain SCP instead of paramiko

Paramiko and other tested Python SSH libraries are all very slow compared to the scp command, so implement this feature without a library.
parent 73d6473c
......@@ -7,7 +7,6 @@ from zipfile import ZipFile, is_zipfile
import magic
import os
import paramiko
import re
import requests
from bz2 import BZ2Decompressor
......@@ -322,7 +321,7 @@ class Disk(object):
self.size = Disk.get(self.dir, self.name).size
def export(self, task, disk_format, exported_name, upload_link, parent_id):
def export(self, task, disk_format, upload_link, port=None):
exported_path = self.get_path() + '.' + disk_format
cmdline = ['ionice', '-c', 'idle',
'qemu-img', 'convert']
......@@ -331,49 +330,26 @@ class Disk(object):
cmdline.extend(['-m', '4', '-O', disk_format,
self.get_path(),
exported_path])
subprocess.check_output(cmdline)
size = os.path.getsize(exported_path)
percent = [0]
cmdline = ['scp', '-B']
if port is not None:
cmdline.extend(['-P', str(port)])
cmdline.extend([exported_path, upload_link])
def update_state(monitor):
new_percent = monitor.bytes_read * 100 / size
if new_percent > percent[0]:
proc = subprocess.Popen(cmdline)
try:
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
percent[0] = new_percent
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': monitor.bytes_read, 'percent': percent[0]}
)
with open(exported_path, 'rb') as exported_disk:
try:
e = MultipartEncoder(
{'data': (exported_name + '.' + disk_format,
exported_disk,
'application/octet-stream')}
)
m = MultipartEncoderMonitor(e, update_state)
# Force the read function to read more than 8192 bytes,
# which is a hardcoded value in httplib. This increases
# the upload speed. See:
# https://github.com/requests/toolbelt/issues/75
m._read = m.read
m.read = lambda _: m._read(1024 * 1024)
response = requests.post(
upload_link,
data=m,
headers={'Content-Type': m.content_type},
params={'no_redirect': ''}
)
if response.status_code != 200:
raise Exception("Invalid response status code: %s" %
response.status_code)
finally:
os.unlink(exported_path)
sleep(1)
except AbortException:
proc.terminate()
logger.info("Export of disk %s aborted" % self.name)
finally:
os.unlink(exported_path)
return os.path.basename(exported_path)
def extract_iso_from_zip(self, disk_path):
with ZipFile(disk_path, 'r') as z:
......
......@@ -63,11 +63,13 @@ class export_disk(AbortableTask):
def run(self, **kwargs):
disk_desc = kwargs["disk_desc"]
disk_format = kwargs["disk_format"]
exported_name = kwargs["exported_name"]
upload_link = kwargs["upload_link"]
parent_id = kwargs["task"]
if "port" in kwargs:
port = kwargs["port"]
else:
port = None
disk = Disk.deserialize(disk_desc)
disk.export(self, disk_format, exported_name, upload_link, parent_id)
return disk.export(self, disk_format, upload_link, port)
@celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment