Commit 4b88e9f4 by Bálint Máhonfai

Implement uploading exported disk to storeserver, remove disk from the datastore after upload

parent 06326eea
...@@ -11,6 +11,7 @@ from time import sleep ...@@ -11,6 +11,7 @@ from time import sleep
from hashlib import md5 from hashlib import md5
import re import re
from requests_toolbelt import MultipartEncoder
import requests import requests
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -265,89 +266,152 @@ class Disk(object): ...@@ -265,89 +266,152 @@ class Disk(object):
raise Exception("Invalid file format. Only qcow and " raise Exception("Invalid file format. Only qcow and "
"iso files are allowed. Image from: %s" % url) "iso files are allowed. Image from: %s" % url)
def export(self, format): def export(self, format, exported_name, upload_link):
format_dict = { format_dict = {
'vmdk': 'vmdk', 'vmdk': 'vmdk',
'qcow2': 'qcow2', 'qcow2': 'qcow2',
'vdi': 'vdi', 'vdi': 'vdi',
'vpc': 'vhd', 'vpc': 'vhd',
} }
exported_path = self.get_path() + '.' + format_dict[format]
cmdline = ['qemu-img', cmdline = ['qemu-img',
'convert', 'convert',
'-O', format, '-O', format,
self.get_path(), self.get_path(),
self.get_path() + '.' + format_dict[format]] exported_path]
subprocess.check_output(cmdline) subprocess.check_output(cmdline)
def extract_iso_from_zip(self, disk_path): with open(exported_path, 'rb') as exported_disk:
with ZipFile(disk_path, 'r') as z: try:
isos = z.namelist() m = MultipartEncoder(
if len(isos) != 1: {'data': (exported_name + '.' + format_dict[format], exported_disk)}
isos = [i for i in isos )
if i.lower().endswith('.iso')] response = requests.post(upload_link,
if len(isos) == 1: data=m,
logger.info('Unzipping %s started.', disk_path) headers={'Content-Type': m.content_type},
f = open(disk_path + '~', 'wb') params={'no_redirect': ''})
zf = z.open(isos[0]) if response.status_code != 200:
with zf, f: raise Exception("Invalid response status code: %s" %
copyfileobj(zf, f) response.status_code)
f.flush() finally:
move(disk_path + '~', disk_path) os.unlink(exported_path)
else:
logger.info("Extracting %s failed, keeping original.",
disk_path) def extract_iso_from_zip(self, disk_path):
with ZipFile(disk_path, 'r') as z:
def snapshot(self): isos = z.namelist()
""" Creating qcow2 snapshot with base image. if len(isos) != 1:
""" isos = [i for i in isos
# Check if snapshot type and qcow2 format matchmatch if i.lower().endswith('.iso')]
if self.type != 'snapshot': if len(isos) == 1:
raise Exception('Invalid type: %s' % self.type) logger.info('Unzipping %s started.', disk_path)
# Check if file already exists f = open(disk_path + '~', 'wb')
if os.path.isfile(self.get_path()): zf = z.open(isos[0])
raise Exception('File already exists: %s' % self.get_path()) with zf, f:
# Check if base file exist copyfileobj(zf, f)
if not os.path.isfile(self.get_base()): f.flush()
raise Exception('Image Base does not exists: %s' % self.get_base()) move(disk_path + '~', disk_path)
# Build list of Strings as command parameters
if self.format == 'iso':
os.symlink(self.get_base(), self.get_path())
elif self.format == 'raw':
raise NotImplemented()
else: else:
cmdline = ['qemu-img', logger.info("Extracting %s failed, keeping original.",
'create', disk_path)
'-b', self.get_base(),
'-f', self.format,
self.get_path()] def snapshot(self):
# Call subprocess """ Creating qcow2 snapshot with base image.
subprocess.check_output(cmdline) """
# Check if snapshot type and qcow2 format matchmatch
def merge_disk_with_base(self, task, new_disk, parent_id=None): if self.type != 'snapshot':
proc = None raise Exception('Invalid type: %s' % self.type)
try: # Check if file already exists
cmdline = [ if os.path.isfile(self.get_path()):
'qemu-img', 'convert', self.get_path(), raise Exception('File already exists: %s' % self.get_path())
'-O', new_disk.format, new_disk.get_path()] # Check if base file exist
# Call subprocess if not os.path.isfile(self.get_base()):
logger.debug( raise Exception('Image Base does not exists: %s' % self.get_base())
"Merging %s into %s.", self.get_path(), # Build list of Strings as command parameters
new_disk.get_path()) if self.format == 'iso':
percent = 0 os.symlink(self.get_base(), self.get_path())
diff_disk = Disk.get(self.dir, self.name) elif self.format == 'raw':
base_disk = Disk.get(self.dir, self.base_name) raise NotImplemented()
clen = min(base_disk.actual_size + diff_disk.actual_size, else:
diff_disk.size) cmdline = ['qemu-img',
output = new_disk.get_path() 'create',
proc = subprocess.Popen(cmdline) '-b', self.get_base(),
'-f', self.format,
self.get_path()]
# Call subprocess
subprocess.check_output(cmdline)
def merge_disk_with_base(self, task, new_disk, parent_id=None):
proc = None
try:
cmdline = [
'qemu-img', 'convert', self.get_path(),
'-O', new_disk.format, new_disk.get_path()]
# Call subprocess
logger.debug(
"Merging %s into %s.", self.get_path(),
new_disk.get_path())
percent = 0
diff_disk = Disk.get(self.dir, self.name)
base_disk = Disk.get(self.dir, self.base_name)
clen = min(base_disk.actual_size + diff_disk.actual_size,
diff_disk.size)
output = new_disk.get_path()
proc = subprocess.Popen(cmdline)
while True:
if proc.poll() is not None:
break
try:
actsize = os.path.getsize(output)
except OSError:
actsize = 0
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
percent = new_percent
if not task.is_aborted():
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'percent': percent})
else:
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
sleep(1)
except AbortException:
proc.terminate()
logger.warning("Aborted merge job, removing %s",
new_disk.get_path())
os.unlink(new_disk.get_path())
except:
if proc:
proc.terminate()
logger.exception("Unknown error occured, removing %s ",
new_disk.get_path())
os.unlink(new_disk.get_path())
raise
def merge_disk_without_base(self, task, new_disk, parent_id=None,
length=1024 * 1024):
try:
fsrc = open(self.get_path(), 'rb')
fdst = open(new_disk.get_path(), 'wb')
clen = self.size
actsize = 0
percent = 0
with fsrc, fdst:
while True: while True:
if proc.poll() is not None: buf = fsrc.read(length)
if not buf:
break break
try: fdst.write(buf)
actsize = os.path.getsize(output) actsize += len(buf)
except OSError:
actsize = 0
new_percent = min(100, round(actsize * 100.0 / clen)) new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent: if new_percent > percent:
percent = new_percent percent = new_percent
...@@ -361,82 +425,42 @@ class Disk(object): ...@@ -361,82 +425,42 @@ class Disk(object):
"Merging new disk %s is aborted by user.", "Merging new disk %s is aborted by user.",
new_disk.get_path()) new_disk.get_path())
raise AbortException() raise AbortException()
sleep(1) except AbortException:
except AbortException: logger.warning("Aborted remove %s", new_disk.get_path())
proc.terminate() os.unlink(new_disk.get_path())
logger.warning("Aborted merge job, removing %s", except:
new_disk.get_path()) logger.exception("Unknown error occured removing %s ",
os.unlink(new_disk.get_path()) new_disk.get_path())
os.unlink(new_disk.get_path())
except: raise
if proc:
proc.terminate()
logger.exception("Unknown error occured, removing %s ", def merge(self, task, new_disk, parent_id=None):
new_disk.get_path()) """ Merging a new_disk from the actual disk and its base.
os.unlink(new_disk.get_path()) """
raise
def merge_disk_without_base(self, task, new_disk, parent_id=None, if task.is_aborted():
length=1024 * 1024): raise AbortException()
try:
fsrc = open(self.get_path(), 'rb')
fdst = open(new_disk.get_path(), 'wb')
clen = self.size
actsize = 0
percent = 0
with fsrc, fdst:
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
actsize += len(buf)
new_percent = min(100, round(actsize * 100.0 / clen))
if new_percent > percent:
percent = new_percent
if not task.is_aborted():
task.update_state(
task_id=parent_id,
state=task.AsyncResult(parent_id).state,
meta={'size': actsize, 'percent': percent})
else:
logger.warning(
"Merging new disk %s is aborted by user.",
new_disk.get_path())
raise AbortException()
except AbortException:
logger.warning("Aborted remove %s", new_disk.get_path())
os.unlink(new_disk.get_path())
except:
logger.exception("Unknown error occured removing %s ",
new_disk.get_path())
os.unlink(new_disk.get_path())
raise
def merge(self, task, new_disk, parent_id=None): # Check if file already exists
""" Merging a new_disk from the actual disk and its base. if os.path.isfile(new_disk.get_path()):
""" raise Exception('File already exists: %s' % self.get_path())
if task.is_aborted(): if self.format == "iso":
raise AbortException() os.symlink(self.get_path(), new_disk.get_path())
elif self.base_name:
self.merge_disk_with_base(task, new_disk, parent_id)
else:
self.merge_disk_without_base(task, new_disk, parent_id)
# Check if file already exists
if os.path.isfile(new_disk.get_path()):
raise Exception('File already exists: %s' % self.get_path())
if self.format == "iso": def delete(self):
os.symlink(self.get_path(), new_disk.get_path()) """ Delete file. """
elif self.base_name: if os.path.isfile(self.get_path()):
self.merge_disk_with_base(task, new_disk, parent_id) os.unlink(self.get_path())
else:
self.merge_disk_without_base(task, new_disk, parent_id)
def delete(self):
""" Delete file. """
if os.path.isfile(self.get_path()):
os.unlink(self.get_path())
@classmethod @classmethod
def list(cls, dir): def list(cls, dir):
""" List all files in <dir> directory.""" """ List all files in <dir> directory."""
return [cls.get(dir, file) for file in os.listdir(dir)] return [cls.get(dir, file) for file in os.listdir(dir)]
celery==3.1.17 celery==3.1.17
requests==2.5.3 requests==2.5.3
requests-toolbelt==0.9.1
filemagic==1.6 filemagic==1.6
...@@ -42,9 +42,9 @@ class download(AbortableTask): ...@@ -42,9 +42,9 @@ class download(AbortableTask):
@celery.task() @celery.task()
def export(disk_desc, format): def export(disk_desc, format, exported_name, upload_link):
disk = Disk.deserialize(disk_desc) disk = Disk.deserialize(disk_desc)
disk.export(format) disk.export(format, exported_name, upload_link)
@celery.task() @celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment