disk.py 18.8 KB
Newer Older
1
import json
Guba Sándor committed
2
import logging
Máhonfai Bálint committed
3 4
import subprocess
from hashlib import md5
Bach Dániel committed
5
from shutil import move, copyfileobj
Őry Máté committed
6
from zipfile import ZipFile, is_zipfile
7

Máhonfai Bálint committed
8 9 10
import magic
import os
import re
11
import requests
Máhonfai Bálint committed
12 13 14 15
from bz2 import BZ2Decompressor
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from time import sleep
from zlib import decompressobj, MAX_WBITS
16 17

logger = logging.getLogger(__name__)
18

19 20 21 22
re_qemu_img = re.compile(r'(file format: (?P<format>(qcow2|raw))|'
                         r'virtual size: \w+ \((?P<size>[0-9]+) bytes\)|'
                         r'backing file: \S+ \(actual path: (?P<base>\S+)\))$')

23
maximum_size = float(os.getenv("DOWNLOAD_MAX_SIZE", 1024 * 1024 * 1024 * 10))
24

25

Bach Dániel committed
26 27 28 29
class AbortException(Exception):
    pass


Guba Sándor committed
30 31 32 33
class FileTooBig(Exception):
    pass


34
class Disk(object):
35
    """ Storage driver DISK object.
Guba Sándor committed
36 37
        Handle qcow2, raw and iso images.
        TYPES, CREATE_TYPES, SNAPSHOT_TYPES are hand managed restrictions.
38
    """
39 40 41
    TYPES = ['snapshot', 'normal']
    FORMATS = ['qcow2', 'raw', 'iso']
    CREATE_FORMATS = ['qcow2', 'raw']
42

43 44
    def __init__(self, dir, name, format, type, size,
                 base_name, actual_size=0):
45 46 47
        # TODO: tests
        self.name = name
        self.dir = os.path.realpath(dir)
48
        if format not in self.FORMATS:
49 50
            raise Exception('Invalid format: %s' % format)
        self.format = format
51 52 53
        if type not in self.TYPES:
            raise Exception('Invalid type: %s' % format)
        self.type = type
54 55 56 57
        try:
            self.size = int(size)
        except:
            self.size = None
58
        self.actual_size = actual_size
59 60
        self.base_name = base_name

Guba Sándor committed
61
    @property
Guba Sándor committed
62 63
    def checksum(self, blocksize=65536):
        hash = md5()
Guba Sándor committed
64
        with open(self.get_path(), "rb") as f:
Guba Sándor committed
65 66 67
            for block in iter(lambda: f.read(blocksize), ""):
                hash.update(block)
        return hash.hexdigest()
Guba Sándor committed
68

Dudás Ádám committed
69 70
    @classmethod
    def deserialize(cls, desc):
Guba Sándor committed
71
        """Create cls object from JSON."""
72 73 74
        logging.info(desc)
        if isinstance(desc, basestring):
            desc = json.loads(desc)
Dudás Ádám committed
75 76 77
        return cls(**desc)

    def get_desc(self):
Guba Sándor committed
78
        """Create dict from Disk object."""
Dudás Ádám committed
79 80 81 82
        return {
            'name': self.name,
            'dir': self.dir,
            'format': self.format,
Guba Sándor committed
83
            'type': self.type,
Dudás Ádám committed
84
            'size': self.size,
85
            'actual_size': self.actual_size,
Dudás Ádám committed
86 87 88
            'base_name': self.base_name,
        }

89
    def get_path(self):
Guba Sándor committed
90
        """Get absolute path for disk."""
91 92 93
        return os.path.realpath(self.dir + '/' + self.name)

    def get_base(self):
Guba Sándor committed
94
        """Get absolute path for disk's base image."""
95 96 97 98 99 100 101
        return os.path.realpath(self.dir + '/' + self.base_name)

    def __unicode__(self):
        return u'%s %s %s %s' % (self.get_path(), self.format,
                                 self.size, self.get_base())

    @classmethod
102
    def get_legacy(cls, dir, name):
103 104
        """ Create disk from path
        """
105
        path = os.path.realpath(dir + '/' + name)
Szeberényi Imre committed
106
        output = subprocess.check_output(['qemu-img', 'info', '--force', path])
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

        type = 'normal'
        base_name = None
        for line in output.split('\n'):
            m = re_qemu_img.search(line)
            if m:
                res = m.groupdict()
                if res.get('format', None) is not None:
                    format = res['format']
                if res.get('size', None) is not None:
                    size = float(res['size'])
                if res.get('base', None) is not None:
                    base_name = os.path.basename(res['base'])
                    type = 'snapshot'
        actual_size = size
        return Disk(dir, name, format, type, size, base_name, actual_size)

    @classmethod
    def get_new(cls, dir, name):
Guba Sándor committed
126
        """Create disk from path."""
127
        path = os.path.realpath(dir + '/' + name)
128
        output = subprocess.check_output(
Szeberényi Imre committed
129
            ['qemu-img', 'info', '--force', '--output=json', path])
130 131 132 133 134
        disk_info = json.loads(output)
        name = name
        format = disk_info.get('format')
        size = disk_info.get('virtual-size')
        actual_size = disk_info.get('actual-size')
Guba Sándor committed
135
        # Check if disk has base (backing-image)
Guba Sándor committed
136
        # Based on backing image determine wether snapshot ot normal image
Guba Sándor committed
137 138 139
        base_path = disk_info.get('backing-filename')
        if base_path:
            base_name = os.path.basename(base_path)
140 141
            type = 'snapshot'
        else:
Guba Sándor committed
142
            base_name = None
143 144
            type = 'normal'
        return Disk(dir, name, format, type, size, base_name, actual_size)
145

146 147 148 149 150 151 152 153
    @classmethod
    def get(cls, dir, name):
        from platform import dist
        if dist()[1] < '14.04':
            return Disk.get_legacy(dir, name)
        else:
            return Disk.get_new(dir, name)

154
    def create(self):
Guba Sándor committed
155 156 157
        """ Creating new image format specified at self.format.
            self.format can be "qcow2-normal"
        """
158
        # Check if type is avaliable to create
159
        if self.format not in self.CREATE_FORMATS:
160
            raise Exception('Invalid format: %s' % self.format)
161 162
        if self.type != 'normal':
            raise Exception('Invalid type: %s' % self.format)
163 164 165 166 167 168
        # Check for file if already exist
        if os.path.isfile(self.get_path()):
            raise Exception('File already exists: %s' % self.get_path())
        # Build list of Strings as command parameters
        cmdline = ['qemu-img',
                   'create',
Guba Sándor committed
169
                   '-f', self.format,
Guba Sándor committed
170
                   self.get_path(),
171
                   str(self.size)]
Guba Sándor committed
172
        logging.info("Create file: %s " % cmdline)
173 174 175
        # Call subprocess
        subprocess.check_output(cmdline)

176 177 178 179 180
    def check_valid_image(self):
        """Check wether the downloaded image is valid.
        Set the proper type for valid images."""
        format_map = [
            ("qcow", "qcow2-norm"),
181 182
            ("iso", "iso"),
            ("x86 boot sector", "iso")
183 184 185 186 187 188 189 190 191 192
        ]
        with magic.Magic() as m:
            ftype = m.id_filename(self.get_path())
            logger.debug("Downloaded file type is: %s", ftype)
            for file_type, disk_format in format_map:
                if file_type in ftype.lower():
                    self.format = disk_format
                    return True
        return False

Őry Máté committed
193
    def download(self, task, url, parent_id=None):  # noqa
Guba Sándor committed
194
        """Download image from url."""
195 196 197
        disk_path = self.get_path()
        logger.info("Downloading image from %s to %s", url, disk_path)
        r = requests.get(url, stream=True)
198
        if r.status_code != 200:
Guba Sándor committed
199 200
            raise Exception("Invalid response status code: %s at %s" %
                            (r.status_code, url))
201 202 203 204 205 206 207 208 209 210 211 212

        if task.is_aborted():
            raise AbortException()
        if parent_id is None:
            parent_id = task.request.id
        chunk_size = 256 * 1024
        ext = url.split('.')[-1].lower()
        if ext == 'gz':
            decompressor = decompressobj(16 + MAX_WBITS)
            # undocumented zlib feature http://stackoverflow.com/a/2424549
        elif ext == 'bz2':
            decompressor = BZ2Decompressor()
Guba Sándor committed
213 214 215
        clen = int(r.headers.get('content-length', maximum_size))
        if clen > maximum_size:
            raise FileTooBig()
216 217 218 219
        percent = 0
        try:
            with open(disk_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=chunk_size):
Őry Máté committed
220
                    if ext in ('gz', 'bz2'):
221 222 223
                        chunk = decompressor.decompress(chunk)
                    f.write(chunk)
                    actsize = f.tell()
224
                    if actsize > maximum_size:
Guba Sándor committed
225
                        raise FileTooBig()
226 227 228 229 230 231 232 233 234 235 236 237 238
                    new_percent = min(100, round(actsize * 100.0 / clen))
                    if new_percent > percent:
                        percent = new_percent
                        if not task.is_aborted():
                            task.update_state(
                                task_id=parent_id,
                                state=task.AsyncResult(parent_id).state,
                                meta={'size': actsize, 'percent': percent})
                        else:
                            raise AbortException()
                if ext == 'gz':
                    f.write(decompressor.flush())
                f.flush()
239
            self.size = Disk.get(self.dir, self.name).size
240 241 242 243 244 245 246
            logger.debug("Download finished %s (%s bytes)",
                         self.name, self.size)
        except AbortException:
            # Cleanup file:
            os.unlink(disk_path)
            logger.info("Download %s aborted %s removed.",
                        url, disk_path)
Guba Sándor committed
247 248 249 250
        except FileTooBig:
            os.unlink(disk_path)
            raise Exception("%s file is too big. Maximum size "
                            "is %s" % url, maximum_size)
251 252 253 254 255 256 257 258 259 260 261 262 263 264
        except:
            os.unlink(disk_path)
            logger.error("Download %s failed, %s removed.",
                         url, disk_path)
            raise
        else:
            if ext == 'zip' and is_zipfile(disk_path):
                task.update_state(
                    task_id=parent_id,
                    state=task.AsyncResult(parent_id).state,
                    meta={'size': actsize, 'extracting': 'zip',
                          'percent': 99})
                self.extract_iso_from_zip(disk_path)
            if not self.check_valid_image():
Őry Máté committed
265
                os.unlink(disk_path)
266
                raise Exception("Invalid file format. Only qcow and "
Guba Sándor committed
267
                                "iso files are allowed. Image from: %s" % url)
Őry Máté committed
268

269 270 271 272 273 274 275 276
    def import_disk(self, task, url, port=None):
        downloaded_file = os.path.join(self.dir, re.split('[:/]', url)[-1])
        cmdline = ['scp', '-B']
        if port is not None:
            cmdline.extend(['-P', str(port)])
        cmdline.extend([downloaded_file, url])

        proc = subprocess.Popen(cmdline)
277
        try:
278 279 280 281 282 283 284
            while proc.poll() is None:
                if task.is_aborted():
                    raise AbortException()
                sleep(1)

            if task.is_aborted():
                raise AbortException()
285

Máhonfai Bálint committed
286 287 288 289
            with magic.Magic() as m:
                ftype = m.id_filename(downloaded_file)

            if 'qcow' in ftype.lower():
Máhonfai Bálint committed
290
                move(downloaded_file, self.get_path())
Máhonfai Bálint committed
291
            else:
Máhonfai Bálint committed
292 293
                cmdline = ['ionice', '-c', 'idle',
                           'qemu-img', 'convert',
294
                           '-m', '4', '-O', 'qcow2',
Máhonfai Bálint committed
295 296 297
                           downloaded_file,
                           self.get_path()]
                subprocess.check_output(cmdline)
298
        except AbortException:
299 300
            if os.path.exists(downloaded_file):
                os.unlink(downloaded_file)
301 302 303 304
            if os.path.exists(self.get_path()):
                os.unlink(self.get_path())
            logger.info("Import of disk %s aborted" % self.name)
        except:
305 306
            if os.path.exists(downloaded_file):
                os.unlink(downloaded_file)
307 308 309 310
            if os.path.exists(self.get_path()):
                os.unlink(self.get_path())
            raise
        else:
311 312
            if os.path.exists(downloaded_file):
                os.unlink(downloaded_file)
313

314 315 316
            if not self.check_valid_image():
                os.unlink(self.get_path())
                raise Exception("Invalid file format.")
317

318
            self.size = Disk.get(self.dir, self.name).size
319

320
    def export(self, task, disk_format, upload_link, port=None):
321
        exported_path = self.get_path() + '.' + disk_format
322
        cmdline = ['ionice', '-c', 'idle',
Máhonfai Bálint committed
323 324 325 326 327 328
                   'qemu-img', 'convert']
        if disk_format == 'qcow2':
            cmdline.append('-c')
        cmdline.extend(['-m', '4', '-O', disk_format,
                        self.get_path(),
                        exported_path])
329
        subprocess.check_output(cmdline)
330

331 332 333 334
        cmdline = ['scp', '-B']
        if port is not None:
            cmdline.extend(['-P', str(port)])
        cmdline.extend([exported_path, upload_link])
335

336 337 338
        proc = subprocess.Popen(cmdline)
        try:
            while proc.poll() is None:
339 340
                if task.is_aborted():
                    raise AbortException()
341 342 343 344 345 346 347 348
                sleep(1)
        except AbortException:
            proc.terminate()
            logger.info("Export of disk %s aborted" % self.name)
        finally:
            os.unlink(exported_path)

        return os.path.basename(exported_path)
349

350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    def extract_iso_from_zip(self, disk_path):
        with ZipFile(disk_path, 'r') as z:
            isos = z.namelist()
            if len(isos) != 1:
                isos = [i for i in isos
                        if i.lower().endswith('.iso')]
            if len(isos) == 1:
                logger.info('Unzipping %s started.', disk_path)
                f = open(disk_path + '~', 'wb')
                zf = z.open(isos[0])
                with zf, f:
                    copyfileobj(zf, f)
                    f.flush()
                move(disk_path + '~', disk_path)
            else:
                logger.info("Extracting %s failed, keeping original.",
                            disk_path)

    def snapshot(self):
        """ Creating qcow2 snapshot with base image.
        """
        # Check if snapshot type and qcow2 format matchmatch
        if self.type != 'snapshot':
            raise Exception('Invalid type: %s' % self.type)
        # Check if file already exists
        if os.path.isfile(self.get_path()):
            raise Exception('File already exists: %s' % self.get_path())
        # Check if base file exist
        if not os.path.isfile(self.get_base()):
            raise Exception('Image Base does not exists: %s' % self.get_base())
        # Build list of Strings as command parameters
        if self.format == 'iso':
            os.symlink(self.get_base(), self.get_path())
        elif self.format == 'raw':
            raise NotImplemented()
385
        else:
386
            cmdline = ['ionice', '-c', 'idle',
Máhonfai Bálint committed
387
                       'qemu-img', 'create',
388 389 390 391 392 393 394 395 396
                       '-b', self.get_base(),
                       '-f', self.format,
                       self.get_path()]
            # Call subprocess
            subprocess.check_output(cmdline)

    def merge_disk_with_base(self, task, new_disk, parent_id=None):
        proc = None
        try:
Máhonfai Bálint committed
397 398 399 400 401 402
            cmdline = ['ionice', '-c', 'idle',
                       'qemu-img', 'convert', '-m', '4']
            if new_disk.format == 'qcow2':
                cmdline.append('-c')
            cmdline.extend(['-O', new_disk.format,
                            self.get_path(), new_disk.get_path()])
403 404
            # Call subprocess
            logger.debug(
405 406
                "Merging %s into %s. %s", self.get_path(),
                new_disk.get_path(), cmdline)
407 408 409 410 411 412 413
            percent = 0
            diff_disk = Disk.get(self.dir, self.name)
            base_disk = Disk.get(self.dir, self.base_name)
            clen = min(base_disk.actual_size + diff_disk.actual_size,
                       diff_disk.size)
            output = new_disk.get_path()
            proc = subprocess.Popen(cmdline)
Bach Dániel committed
414
            while True:
415
                if proc.poll() is not None:
Bach Dániel committed
416
                    break
417 418 419 420
                try:
                    actsize = os.path.getsize(output)
                except OSError:
                    actsize = 0
Guba Sándor committed
421 422 423 424 425 426 427 428 429 430 431 432 433
                new_percent = min(100, round(actsize * 100.0 / clen))
                if new_percent > percent:
                    percent = new_percent
                    if not task.is_aborted():
                        task.update_state(
                            task_id=parent_id,
                            state=task.AsyncResult(parent_id).state,
                            meta={'size': actsize, 'percent': percent})
                    else:
                        logger.warning(
                            "Merging new disk %s is aborted by user.",
                            new_disk.get_path())
                        raise AbortException()
434 435 436 437 438 439
                sleep(1)
        except AbortException:
            proc.terminate()
            logger.warning("Aborted merge job, removing %s",
                           new_disk.get_path())
            os.unlink(new_disk.get_path())
Bach Dániel committed
440

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
        except:
            if proc:
                proc.terminate()
            logger.exception("Unknown error occured, removing %s ",
                             new_disk.get_path())
            os.unlink(new_disk.get_path())
            raise

    def merge_disk_without_base(self, task, new_disk, parent_id=None,
                                length=1024 * 1024):
        try:
            fsrc = open(self.get_path(), 'rb')
            fdst = open(new_disk.get_path(), 'wb')
            clen = self.size
            actsize = 0
            percent = 0
            with fsrc, fdst:
                while True:
                    buf = fsrc.read(length)
                    if not buf:
                        break
                    fdst.write(buf)
                    actsize += len(buf)
                    new_percent = min(100, round(actsize * 100.0 / clen))
                    if new_percent > percent:
                        percent = new_percent
                        if not task.is_aborted():
                            task.update_state(
                                task_id=parent_id,
                                state=task.AsyncResult(parent_id).state,
                                meta={'size': actsize, 'percent': percent})
                        else:
                            logger.warning(
                                "Merging new disk %s is aborted by user.",
                                new_disk.get_path())
                            raise AbortException()
        except AbortException:
            logger.warning("Aborted remove %s", new_disk.get_path())
            os.unlink(new_disk.get_path())
        except:
            logger.exception("Unknown error occured removing %s ",
                             new_disk.get_path())
            os.unlink(new_disk.get_path())
            raise
Bach Dániel committed
485

486 487 488
    def merge(self, task, new_disk, parent_id=None):
        """ Merging a new_disk from the actual disk and its base.
        """
Guba Sándor committed
489

490 491
        if task.is_aborted():
            raise AbortException()
Bach Dániel committed
492

493 494 495
        # Check if file already exists
        if os.path.isfile(new_disk.get_path()):
            raise Exception('File already exists: %s' % self.get_path())
Bach Dániel committed
496

497 498 499 500 501 502
        if self.format == "iso":
            os.symlink(self.get_path(), new_disk.get_path())
        elif self.base_name:
            self.merge_disk_with_base(task, new_disk, parent_id)
        else:
            self.merge_disk_without_base(task, new_disk, parent_id)
Guba Sándor committed
503

504 505 506 507
    def delete(self):
        """ Delete file. """
        if os.path.isfile(self.get_path()):
            os.unlink(self.get_path())
508

509 510 511 512
    @classmethod
    def list(cls, dir):
        """ List all files in <dir> directory."""
        return [cls.get(dir, file) for file in os.listdir(dir)]