Commit 5bee7535 by Szeberényi Imre

Merge branch 'ssh_export_import' into 'master'

Export and import disks via SSH

See merge request !21
parents 5997c4b1 2b90e13c
import json
import os
import subprocess
import logging
import magic
import subprocess
from hashlib import md5
from shutil import move, copyfileobj
from zipfile import ZipFile, is_zipfile
from zlib import decompressobj, MAX_WBITS
from bz2 import BZ2Decompressor
from time import sleep
from hashlib import md5
import re
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import magic
import os
import re
import requests
from bz2 import BZ2Decompressor
from time import sleep
from zlib import decompressobj, MAX_WBITS
logger = logging.getLogger(__name__)
......@@ -267,26 +265,19 @@ class Disk(object):
raise Exception("Invalid file format. Only qcow and "
"iso files are allowed. Image from: %s" % url)
def import_disk(self, task, url, parent_id):
r = requests.get(url, stream=True)
clen = int(r.headers.get('content-length'))
downloaded_file = os.path.join(self.dir, url.split('/')[-1])
percent = 0
def import_disk(self, task, url, port=22):
downloaded_file = os.path.join(self.dir, re.split('[:/]', url)[-1])
cmdline = ['scp', '-B', '-P', str(port), url, downloaded_file]
proc = subprocess.Popen(cmdline)
try:
with open(downloaded_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=256 * 1024):
f.write(chunk)
current_size = f.tell()
new_percent = current_size * 100 / clen
if new_percent > percent:
if task.is_aborted():
raise AbortException()
percent = new_percent
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': current_size, 'percent': percent}
)
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
sleep(2)
if task.is_aborted():
raise AbortException()
with magic.Magic() as m:
ftype = m.id_filename(downloaded_file)
......@@ -294,9 +285,8 @@ class Disk(object):
if 'qcow' in ftype.lower():
move(downloaded_file, self.get_path())
else:
cmdline = ['ionice', '-c', 'idle',
'qemu-img',
'convert',
cmdline = ['ionice', '-c', 'idle',
'qemu-img', 'convert',
'-m', '4', '-O', 'qcow2',
downloaded_file,
self.get_path()]
......@@ -323,59 +313,32 @@ class Disk(object):
self.size = Disk.get(self.dir, self.name).size
def export(self, task, disk_format, exported_name, upload_link, parent_id):
def export(self, task, disk_format, upload_link, port=22):
exported_path = self.get_path() + '.' + disk_format
cmdline = ['ionice', '-c', 'idle',
'qemu-img',
'convert']
if disk_format == 'qcow2':
cmdline += ['-c']
cmdline += ['-m', '4', '-O', disk_format,
self.get_path(),
exported_path]
'qemu-img', 'convert']
if disk_format == 'qcow2':
cmdline.append('-c')
cmdline.extend(['-m', '4', '-O', disk_format,
self.get_path(),
exported_path])
subprocess.check_output(cmdline)
size = os.path.getsize(exported_path)
percent = [0]
cmdline = ['scp', '-B', '-P', str(port), exported_path, upload_link]
def update_state(monitor):
new_percent = monitor.bytes_read * 100 / size
if new_percent > percent[0]:
proc = subprocess.Popen(cmdline)
try:
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
percent[0] = new_percent
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': monitor.bytes_read, 'percent': percent[0]}
)
with open(exported_path, 'rb') as exported_disk:
try:
e = MultipartEncoder(
{'data': (exported_name + '.' + disk_format,
exported_disk,
'application/octet-stream')}
)
m = MultipartEncoderMonitor(e, update_state)
# Force the read function to read more than 8192 bytes,
# which is a hardcoded value in httplib. This increases
# the upload speed. See:
# https://github.com/requests/toolbelt/issues/75
m._read = m.read
m.read = lambda _: m._read(1024 * 1024)
response = requests.post(
upload_link,
data=m,
headers={'Content-Type': m.content_type},
params={'no_redirect': ''}
)
if response.status_code != 200:
raise Exception("Invalid response status code: %s" %
response.status_code)
finally:
os.unlink(exported_path)
sleep(2)
except AbortException:
proc.terminate()
logger.info("Export of disk %s aborted" % self.name)
finally:
os.unlink(exported_path)
return os.path.basename(exported_path)
def extract_iso_from_zip(self, disk_path):
with ZipFile(disk_path, 'r') as z:
......@@ -414,8 +377,7 @@ class Disk(object):
raise NotImplemented()
else:
cmdline = ['ionice', '-c', 'idle',
'qemu-img',
'create',
'qemu-img', 'create',
'-b', self.get_base(),
'-f', self.format,
self.get_path()]
......@@ -425,10 +387,12 @@ class Disk(object):
def merge_disk_with_base(self, task, new_disk, parent_id=None):
proc = None
try:
cmdline = ['ionice', '-c', 'idle', 'qemu-img', 'convert', '-m', '4']
if new_disk.format == 'qcow2':
cmdline += ['-c']
cmdline += ['-O', new_disk.format, self.get_path(), new_disk.get_path()]
cmdline = ['ionice', '-c', 'idle',
'qemu-img', 'convert', '-m', '4']
if new_disk.format == 'qcow2':
cmdline.append('-c')
cmdline.extend(['-O', new_disk.format,
self.get_path(), new_disk.get_path()])
# Call subprocess
logger.debug(
"Merging %s into %s. %s", self.get_path(),
......
celery==3.1.17
requests==2.5.3
requests-toolbelt==0.9.1
filemagic==1.6
from disk import Disk
from storagecelery import celery
import logging
from os import path, unlink, statvfs, listdir, mkdir
from shutil import move
from celery.contrib.abortable import AbortableTask
import logging
logger = logging.getLogger(__name__)
from disk import Disk
from storagecelery import celery
logger = logging.getLogger(__name__)
trash_directory = "trash"
......@@ -47,9 +48,9 @@ class import_disk(AbortableTask):
def run(self, **kwargs):
disk_desc = kwargs["disk_desc"]
url = kwargs["url"]
parent_id = kwargs["task"]
port = kwargs["port"]
disk = Disk.deserialize(disk_desc)
disk.import_disk(self, url, parent_id)
disk.import_disk(self, url, port)
return {
"size": disk.size,
"checksum": disk.checksum
......@@ -62,11 +63,10 @@ class export_disk(AbortableTask):
def run(self, **kwargs):
disk_desc = kwargs["disk_desc"]
disk_format = kwargs["disk_format"]
exported_name = kwargs["exported_name"]
upload_link = kwargs["upload_link"]
parent_id = kwargs["task"]
port = kwargs["port"]
disk = Disk.deserialize(disk_desc)
disk.export(self, disk_format, exported_name, upload_link, parent_id)
return disk.export(self, disk_format, upload_link, port)
@celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment