Commit 944ddb8b by carpoon

update disk export methods

add retry top network copy
add rsync mode to export disk
add export disk to a datastore path
parent e52e3c02
......@@ -176,7 +176,7 @@ class Disk(object):
subprocess.check_output(cmdline)
def check_valid_image(self):
"""Check wether the downloaded image is valid.
"""Check whether the downloaded image is valid.
Set the proper type for valid images."""
format_map = [
("qcow", "qcow2-norm"),
......@@ -304,16 +304,45 @@ class Disk(object):
raise Exception("Invalid file format. Only qcow and "
"iso files are allowed. Image from: %s" % url)
def import_disk(self, task, url, port=22):
def import_disk(self, task, url, port=22, identity=None, mode="scp"):
logger.debug("Start disk import...")
downloaded_file = os.path.join(self.dir, re.split('[:/]', url)[-1])
cmdline = ['scp', '-B', '-P', str(port), url, downloaded_file]
if mode == "scp":
cmdline = ['scp', '-B', '-P', str(port), url, downloaded_file]
if identity is not None and identity != "":
cmdline.append("-i")
cmdline.append(identity)
elif mode == "rsync":
cmdline = ["rsync", "-qLS", url, downloaded_file]
cmdline.append("-e")
if identity is not None:
cmdline.append("ssh -i %s -p %s" % (identity, str(port)))
else:
cmdline.append("ssh -p %s" % str(port))
else:
logger.error("Invalid mode for disk export: %s" % mode)
raise AbortException()
logger.debug("Calling file transfer with command line: %s" % str(cmdline))
proc = subprocess.Popen(cmdline)
try:
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
sleep(2)
# let's try the file transfer 5 times, it may be an intermittent network issue
for i in range(4, -1, -1):
proc = subprocess.Popen(cmdline)
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
sleep(2)
if proc.returncode == 0:
break
else:
logger.error("Copy over ssh failed with return code: %s, will try %s more time(s)..." %
(str(proc.returncode), str(i)))
if proc.stdout is not None:
logger.info(proc.stdout.read())
if proc.stdout is not None:
logger.error(proc.stderr.read())
if task.is_aborted():
raise AbortException()
......@@ -324,11 +353,13 @@ class Disk(object):
if 'qcow' in ftype.lower():
move(downloaded_file, self.get_path())
else:
logger.debug("Calling qemu-img with command line: %s" % str(cmdline))
cmdline = ['ionice', '-c', 'idle',
'qemu-img', 'convert',
'-m', '4', '-O', 'qcow2',
downloaded_file,
self.get_path()]
logger.debug("Calling qemu-img with command line: %s" % str(cmdline))
subprocess.check_output(cmdline)
except AbortException:
if os.path.exists(downloaded_file):
......@@ -352,25 +383,60 @@ class Disk(object):
self.size = Disk.get(self.dir, self.name).size
def export(self, task, disk_format, upload_link, port=22):
def export_disk_to_datastore(self, task, disk_format, datastore):
logger.debug("Start disk export to datastore...")
exported_path = os.path.join(datastore, "exports", os.path.basename(self.get_path()) + '.' + disk_format)
cmdline = ['ionice', '-c', 'idle', 'qemu-img', 'convert']
if disk_format == 'qcow2' or disk_format == 'qcow':
cmdline.append('-c')
cmdline.extend(['-m', '4', '-O', disk_format, self.get_path(), exported_path])
logger.debug("Calling qemu-img with command line: %s" % str(cmdline))
subprocess.check_output(cmdline)
def export_disk(self, task, disk_format, upload_link, port=22, identity=None, mode="scp"):
logger.debug("Start disk export...")
exported_path = self.get_path() + '.' + disk_format
cmdline = ['ionice', '-c', 'idle',
'qemu-img', 'convert']
if disk_format == 'qcow2':
cmdline = ['ionice', '-c', 'idle', 'qemu-img', 'convert']
if disk_format == 'qcow2' or disk_format == 'qcow':
cmdline.append('-c')
cmdline.extend(['-m', '4', '-O', disk_format,
self.get_path(),
exported_path])
cmdline.extend(['-m', '4', '-O', disk_format, self.get_path(), exported_path])
logger.debug("Calling qemu-img with command line: %s" % str(cmdline))
subprocess.check_output(cmdline)
cmdline = ['scp', '-B', '-P', str(port), exported_path, upload_link]
if mode == "scp":
cmdline = ['scp', '-B', '-P', str(port), exported_path, upload_link]
if identity is not None and identity != "":
cmdline.append("-i")
cmdline.append(identity)
elif mode == "rsync":
cmdline = ["rsync", "-qSL", exported_path, upload_link]
cmdline.append("-e")
if identity is not None:
cmdline.append("ssh -i %s -p %s" % (identity, str(port)))
else:
cmdline.append("ssh -p %s" % str(port))
else:
logger.error("Invalid mode for disk export: %s" % mode)
raise AbortException()
proc = subprocess.Popen(cmdline)
logger.debug("Calling file transfer with command line. %s" % str(cmdline))
try:
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
sleep(2)
# let's try the file transfer 5 times, it may be an intermittent network issue
for i in range(4, -1, -1):
proc = subprocess.Popen(cmdline)
while proc.poll() is None:
if task.is_aborted():
raise AbortException()
sleep(2)
if proc.returncode == 0:
break
else:
logger.error("Copy over ssh failed with return code: %s, will try %s more time(s)..." %
(str(proc.returncode), str(i)))
if proc.stdout is not None:
logger.info(proc.stdout.read())
if proc.stdout is not None:
logger.error(proc.stderr.read())
except AbortException:
proc.terminate()
logger.info("Export of disk %s aborted" % self.name)
......
......@@ -65,8 +65,10 @@ class import_disk(AbortableTask):
disk_desc = kwargs["disk_desc"]
url = kwargs["url"]
port = kwargs["port"]
store_identity_file = kwargs["store_identity_file"]
store_ssh_mode = kwargs["store_ssh_mode"]
disk = Disk.deserialize(disk_desc)
disk.import_disk(self, url, port)
disk.import_disk(self, url, port, store_identity_file, store_ssh_mode)
return {
"size": disk.size,
"checksum": disk.checksum
......@@ -81,8 +83,21 @@ class export_disk(AbortableTask):
disk_format = kwargs["disk_format"]
upload_link = kwargs["upload_link"]
port = kwargs["port"]
store_identity_file = kwargs["store_identity_file"]
store_ssh_mode = kwargs["store_ssh_mode"]
disk = Disk.deserialize(disk_desc)
return disk.export(self, disk_format, upload_link, port)
return disk.export_disk(self, disk_format, upload_link, port, store_identity_file, store_ssh_mode)
class export_disk_to_datastore(AbortableTask):
time_limit = 18000
def run(self, **kwargs):
disk_desc = kwargs["disk_desc"]
disk_format = kwargs["disk_format"]
datastore = kwargs["datastore"]
disk = Disk.deserialize(disk_desc)
return disk.export_disk_to_datastore(self, disk_format, datastore)
@celery.task()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment