vmdriver.py 16.6 KB
Newer Older
1
""" Driver for libvirt. """
tarokkk committed
2 3
import libvirt
import logging
4
import os
user committed
5
import sys
Guba Sándor committed
6 7
import socket
import json
8
from decorator import decorator
9
import lxml.etree as ET
10

11
from psutil import NUM_CPUS, virtual_memory, cpu_percent
12

13
from celery.contrib.abortable import AbortableTask
14

15
from vm import VMInstance, VMDisk, VMNetwork
Guba Sándor committed
16

17
from vmcelery import celery, lib_connection, to_bool
tarokkk committed
18

user committed
19 20
sys.path.append(os.path.dirname(os.path.basename(__file__)))

21
vm_xml_dump = None
tarokkk committed
22

Guba Sándor committed
23 24 25 26 27 28 29 30 31 32
state_dict = {0: 'NOSTATE',
              1: 'RUNNING',
              2: 'BLOCKED',
              3: 'PAUSED',
              4: 'SHUTDOWN',
              5: 'SHUTOFF',
              6: 'CRASHED',
              7: 'PMSUSPENDED'
              }

tarokkk committed
33

34
# class Singleton(type):
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
#
#    """ Singleton class."""
#
#    _instances = {}
#
#    def __call__(cls, *args, **kwargs):
#        if cls not in cls._instances:
#            cls._instances[cls] = super(Singleton, cls).__call__(*args,
#                                                                 **kwargs)
#        return cls._instances[cls]


class Connection(object):

    """ Singleton class to handle connection."""

#    __metaclass__ = Singleton
    connection = None

    @classmethod
    def get(cls):
        """ Return the libvirt connection."""

        return cls.connection

    @classmethod
    def set(cls, connection):
        """ Set the libvirt connection."""

        cls.connection = connection


67 68
@decorator
def req_connection(original_function, *args, **kw):
69 70 71 72 73 74
    """Connection checking decorator for libvirt.

    If envrionment variable LIBVIRT_KEEPALIVE is set
    it will use the connection from the celery worker.

    Return the decorateed function
75

76
    """
77
    logging.debug("Decorator running")
78
    if Connection.get() is None:
79 80 81 82 83 84 85 86 87 88 89 90 91
        connect()
        try:
            logging.debug("Decorator calling original function")
            return_value = original_function(*args, **kw)
        finally:
            logging.debug("Finally part of decorator")
            disconnect()
        return return_value
    else:
        logging.debug("Decorator calling original \
                        function with active connection")
        return_value = original_function(*args, **kw)
        return return_value
tarokkk committed
92 93


Guba Sándor committed
94 95
@decorator
def wrap_libvirtError(original_function, *args, **kw):
96 97 98 99 100
    """ Decorator to wrap libvirt error in simple Exception.

    Return decorated function

    """
Guba Sándor committed
101
    try:
102
        return original_function(*args, **kw)
Guba Sándor committed
103
    except libvirt.libvirtError as e:
104
        logging.error(e.get_error_message())
105 106 107 108
        e_msg = e.get_error_message()
        if vm_xml_dump is not None:
            e_msg += "\n"
            e_msg += vm_xml_dump
Guba Sándor committed
109 110 111 112 113 114
        new_e = Exception(e.get_error_message())
        new_e.libvirtError = True
        raise new_e


@wrap_libvirtError
tarokkk committed
115
def connect(connection_string='qemu:///system'):
116 117 118 119 120 121
    """ Connect to the libvirt daemon.

    String is specified in the connection_string parameter
    the default is the local root.

    """
122
    if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")):
123 124
        if Connection.get() is None:
            Connection.set(libvirt.open(connection_string))
125 126 127
            logging.debug("Connection estabilished to libvirt.")
        else:
            logging.debug("There is already an active connection to libvirt.")
tarokkk committed
128
    else:
129
        Connection.set(lib_connection)
Guba Sándor committed
130
        logging.debug("Using celery libvirt connection connection.")
tarokkk committed
131

tarokkk committed
132

Guba Sándor committed
133
@wrap_libvirtError
tarokkk committed
134
def disconnect():
135
    """ Disconnect from the active libvirt daemon connection."""
136
    if os.getenv('LIBVIRT_KEEPALIVE') is None:
137
        if Connection.get() is None:
138 139
            logging.debug('There is no available libvirt conection.')
        else:
140
            Connection.get().close()
141
            logging.debug('Connection closed to libvirt.')
142
            Connection.set(None)
tarokkk committed
143
    else:
144
        logging.debug('Keepalive connection should not close.')
tarokkk committed
145 146


Guba Sándor committed
147
@celery.task
tarokkk committed
148
@req_connection
Guba Sándor committed
149
@wrap_libvirtError
tarokkk committed
150
def define(vm):
151 152
    """ Define permanent virtual machine from xml. """
    Connection.get().defineXML(vm.dump_xml())
tarokkk committed
153 154 155
    logging.info("Virtual machine %s is defined from xml", vm.name)


Guba Sándor committed
156
@celery.task
tarokkk committed
157
@req_connection
Guba Sándor committed
158
@wrap_libvirtError
159
def create(vm_desc):
160 161 162
    """ Create and start non-permanent virtual machine from xml.

    Return the domain info dict.
tarokkk committed
163 164 165 166 167 168
    flags can be:
        VIR_DOMAIN_NONE = 0
        VIR_DOMAIN_START_PAUSED = 1
        VIR_DOMAIN_START_AUTODESTROY = 2
        VIR_DOMAIN_START_BYPASS_CACHE = 4
        VIR_DOMAIN_START_FORCE_BOOT = 8
169 170

    """
171 172
    vm = VMInstance.deserialize(vm_desc)
    # Setting proper hypervisor
173
    vm.vm_type = os.getenv("HYPERVISOR_TYPE", "test")
174 175
    if vm.vm_type == "test":
        vm.arch = "i686"
176 177
    vm_xml_dump = vm.dump_xml()
    logging.info(vm_xml_dump)
178 179
    # Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver
    if vm.vm_type == "test":
180
        Connection.get().createXML(
181
            vm_xml_dump, libvirt.VIR_DOMAIN_NONE)
182
        domain = lookupByName(vm.name)
183
        domain.suspend()
Guba Sándor committed
184
    # Real driver create
185
    else:
186
        Connection.get().createXML(
187
            vm_xml_dump, libvirt.VIR_DOMAIN_START_PAUSED)
188
        logging.info("Virtual machine %s is created from xml", vm.name)
Guba Sándor committed
189 190 191 192 193 194 195 196 197
    # context
    try:
        sock = socket.create_connection(('127.0.0.1', 1235), 3)
        data = {'boot_token': vm.boot_token,
                'socket': '/var/lib/libvirt/serial/%s' % vm.name}
        sock.sendall(json.dumps(data))
        sock.close()
    except socket.error:
        logging.error('Unable to connect to context server')
198
    return vm_xml_dump
tarokkk committed
199 200


201
class shutdown(AbortableTask):
202
    """ Shutdown virtual machine (need ACPI support).
203 204 205
    Return When domain is missiing.
    This job is abortable:
        AbortableAsyncResult(id="<<jobid>>").abort()
206
    """
207 208 209
    time_limit = 120

    @req_connection
210
    def run(self, args):
211
        from time import sleep
212
        name, = args
213
        logging.info("Shutdown started for vm: %s", name)
214 215
        try:
            domain = lookupByName(name)
216
            logging.info("%s domain found in shutdown", name)
217
            domain.shutdown()
218
            logging.info("Domain shutdown called for vm: %s", name)
219 220 221 222 223 224 225 226
            while True:
                try:
                    Connection.get().lookupByName(name)
                except libvirt.libvirtError as e:
                    if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
                        return
                    else:
                        raise
227
                else:
228
                    if self.is_aborted():
229 230 231 232 233 234 235
                        logging.info("Shutdown aborted on vm: %s", name)
                        return
                    sleep(5)
        except libvirt.libvirtError as e:
            new_e = Exception(e.get_error_message())
            new_e.libvirtError = True
            raise new_e
236 237 238 239


@celery.task
@req_connection
Guba Sándor committed
240
@wrap_libvirtError
tarokkk committed
241
def delete(name):
242
    """ Destroy the running called 'name' virtual machine. """
tarokkk committed
243 244 245 246
    domain = lookupByName(name)
    domain.destroy()


Guba Sándor committed
247
@celery.task
tarokkk committed
248
@req_connection
Guba Sándor committed
249
@wrap_libvirtError
tarokkk committed
250
def list_domains():
251 252 253 254 255
    """ List the running domains.

    :return list: List of domains name in host.

    """
256
    domain_list = []
257 258
    for i in Connection.get().listDomainsID():
        dom = Connection.get().lookupByID(i)
259
        domain_list.append(dom.name())
260
    return domain_list
tarokkk committed
261 262


Guba Sándor committed
263
@celery.task
tarokkk committed
264
@req_connection
Guba Sándor committed
265
@wrap_libvirtError
266 267 268 269 270 271 272 273 274
def list_domains_info():
    """ List the running domains.

    :return list: List of domains info dict.

    """
    domain_list = []
    for i in Connection.get().listDomainsID():
        dom = Connection.get().lookupByID(i)
275 276 277
        domain_dict = _parse_info(dom.info())
        domain_dict['name'] = dom.name()
        domain_list.append(domain_dict)
278 279 280 281 282 283
    return domain_list


@celery.task
@req_connection
@wrap_libvirtError
tarokkk committed
284
def lookupByName(name):
285 286
    """ Return with the requested Domain. """
    return Connection.get().lookupByName(name)
tarokkk committed
287 288


Guba Sándor committed
289
@celery.task
tarokkk committed
290
@req_connection
Guba Sándor committed
291
@wrap_libvirtError
tarokkk committed
292
def undefine(name):
293 294 295 296 297
    """ Undefine an already defined virtual machine.

    If it's running it becomes transient (lost on reboot)

    """
tarokkk committed
298 299 300 301
    domain = lookupByName(name)
    domain.undefine()


Guba Sándor committed
302
@celery.task
tarokkk committed
303
@req_connection
Guba Sándor committed
304
@wrap_libvirtError
305
def start(name):
306 307
    """ Start an already defined virtual machine."""

308
    domain = lookupByName(name)
tarokkk committed
309 310 311
    domain.create()


Guba Sándor committed
312
@celery.task
tarokkk committed
313
@req_connection
Guba Sándor committed
314
@wrap_libvirtError
315
def suspend(name):
316 317 318 319 320 321
    """ Stop virtual machine and keep memory in RAM.

    Return the domain info dict.

    """

322 323
    domain = lookupByName(name)
    domain.suspend()
324
    return _parse_info(domain.info())
325 326 327 328


@celery.task
@req_connection
Guba Sándor committed
329
@wrap_libvirtError
330
def save(name, path):
331 332
    """ Stop virtual machine and save its memory to path. """

333
    domain = lookupByName(name)
tarokkk committed
334 335 336
    domain.save(path)


Guba Sándor committed
337
@celery.task
tarokkk committed
338
@req_connection
Guba Sándor committed
339
@wrap_libvirtError
340
def restore(name, path):
341 342 343 344 345 346 347 348
    """ Restore a saved virtual machine.

    Restores the virtual machine from the memory image
    stored at path.
    Return the domain info dict.

    """
    Connection.get().restore(path)
349
    return domain_info(name)
Guba Sándor committed
350 351


Guba Sándor committed
352
@celery.task
Guba Sándor committed
353
@req_connection
Guba Sándor committed
354
@wrap_libvirtError
355
def resume(name):
356 357 358 359 360 361
    """ Resume stopped virtual machines.

    Return the domain info dict.

    """

362
    domain = lookupByName(name)
tarokkk committed
363
    domain.resume()
364
    return _parse_info(domain.info())
tarokkk committed
365 366


Guba Sándor committed
367
@celery.task
tarokkk committed
368
@req_connection
Guba Sándor committed
369
@wrap_libvirtError
370
def reset(name):
371 372 373 374 375 376
    """ Reset (power reset) virtual machine.

    Return the domain info dict.

    """

377
    domain = lookupByName(name)
378
    domain.reset(0)
379
    return _parse_info(domain.info())
tarokkk committed
380 381


Guba Sándor committed
382
@celery.task
tarokkk committed
383
@req_connection
Guba Sándor committed
384
@wrap_libvirtError
385
def reboot(name):
386 387 388 389 390
    """ Reboot (with guest acpi support) virtual machine.

    Return the domain info dict.

    """
391
    domain = lookupByName(name)
392
    domain.reboot(0)
393
    return _parse_info(domain.info())
394 395


Guba Sándor committed
396
@celery.task
397
@req_connection
Guba Sándor committed
398
@wrap_libvirtError
399
def node_info():
400 401 402 403
    """ Get info from Host as dict.

    Return dict:

404 405 406 407 408 409 410 411 412 413 414 415
    model   string indicating the CPU model
    memory  memory size in kilobytes
    cpus    the number of active CPUs
    mhz     expected CPU frequency
    nodes    the number of NUMA cell, 1 for unusual NUMA
             topologies or uniform memory access;
             check capabilities XML for the actual NUMA topology
    sockets  number of CPU sockets per node if nodes > 1,
             1 in case of unusual NUMA topology
    cores    number of cores per socket, total number of
             processors in case of unusual NUMA topolog
    threads  number of threads per core, 1 in case of unusual numa topology
416 417 418

    """

419 420
    keys = ['model', 'memory', 'cpus', 'mhz',
            'nodes', 'sockets', 'cores', 'threads']
421
    values = Connection.get().getInfo()
422 423 424
    return dict(zip(keys, values))


425
def _parse_info(values):
426 427 428 429 430 431
    """ Parse libvirt domain info into dict.

    Return the info dict.

    """

432 433 434 435 436 437 438
    keys = ['state', 'maxmem', 'memory', 'virtcpunum', 'cputime']
    info = dict(zip(keys, values))
    # Change state to proper ENUM
    info['state'] = state_dict[info['state']]
    return info


Guba Sándor committed
439
@celery.task
440
@req_connection
Guba Sándor committed
441
@wrap_libvirtError
442
def domain_info(name):
443 444 445
    """ Get the domain info from libvirt.

    Return the domain info dict:
446 447 448 449 450
    state   the running state, one of virDomainState
    maxmem  the maximum memory in KBytes allowed
    memory  the memory in KBytes used by the domain
    virtcpunum    the number of virtual CPUs for the domain
    cputime    the CPU time used in nanoseconds
451 452

    """
453
    dom = lookupByName(name)
454
    return _parse_info(dom.info())
455 456


Guba Sándor committed
457
@celery.task
458
@req_connection
Guba Sándor committed
459
@wrap_libvirtError
460
def network_info(name, network):
461 462
    """ Return the network info dict.

463 464 465 466 467 468 469 470
    rx_bytes
    rx_packets
    rx_errs
    rx_drop
    tx_bytes
    tx_packets
    tx_errs
    tx_drop
471 472

    """
473 474 475 476 477 478 479 480
    keys = ['rx_bytes', 'rx_packets', 'rx_errs', 'rx_drop',
            'tx_bytes', 'tx_packets', 'tx_errs', 'tx_drop']
    dom = lookupByName(name)
    values = dom.interfaceStats(network)
    info = dict(zip(keys, values))
    return info


Guba Sándor committed
481
@celery.task
482
@req_connection
Guba Sándor committed
483
@wrap_libvirtError
484
def send_key(name, key_code):
485 486 487 488 489 490
    """ Sending linux key_code to the name vm.

    key_code can be optained from linux_keys.py
    e.x: linuxkeys.KEY_RIGHTCTRL

    """
491 492 493 494 495
    domain = lookupByName(name)
    domain.sendKey(libvirt.VIR_KEYCODE_SET_LINUX, 100, [key_code], 1, 0)


def _stream_handler(stream, buf, opaque):
496
    opaque.write(buf)
497 498


Guba Sándor committed
499
@celery.task
500
@req_connection
Guba Sándor committed
501
@wrap_libvirtError
502
def screenshot(name):
503
    """Save screenshot of virtual machine.
504
    Returns a ByteIO object that contains the screenshot in png format.
505
    """
506 507
    from io import BytesIO
    from PIL import Image
508 509 510 511 512 513 514 515
    # Import linuxkeys to get defines
    import linuxkeys
    # Connection need for the stream object
    domain = lookupByName(name)
    # Send key to wake up console
    domain.sendKey(libvirt.VIR_KEYCODE_SET_LINUX,
                   100, [linuxkeys.KEY_RIGHTCTRL], 1, 0)
    # Create Stream to get data
516
    stream = Connection.get().newStream(0)
517 518
    # Take screenshot accessible by stream (return mimetype)
    domain.screenshot(stream, 0, 0)
519
    # Get file to save data (send on AMQP?)
520
    fd = BytesIO()
521 522 523 524 525
    try:
        # Save data with handler
        stream.recvAll(_stream_handler, fd)
    finally:
        stream.finish()
526 527
    # Convert ppm to png
    # Seek to the beginning of the stream
528 529 530 531 532 533
    fd.seek(0)
    # Get the image
    image = BytesIO()
    ppm = Image.open(fd)
    ppm.save(image, format='PNG')
    return image
534 535


Guba Sándor committed
536
@celery.task
537
@req_connection
Guba Sándor committed
538
@wrap_libvirtError
539
def migrate(name, host, live=False):
540
    """ Migrate domain to host. """
541 542 543 544 545 546 547 548 549
    flags = libvirt.VIR_MIGRATE_PEER2PEER
    if live:
        flags = flags | libvirt.VIR_MIGRATE_LIVE
    domain = lookupByName(name)
    domain.migrateToURI(
        duri="qemu+tcp://" + host + "/system",
        flags=flags,
        dname=name,
        bandwidth=0)
550
    # return _parse_info(domain.info())
Őry Máté committed
551

552

Őry Máté committed
553
@celery.task
554 555 556
@req_connection
@wrap_libvirtError
def attach_disk(name, disk):
Guba Sándor committed
557
    """ Attach Disk to a running virtual machine. """
558 559 560 561 562 563 564 565 566
    domain = lookupByName(name)
    disk = VMDisk.deserialize(disk)
    domain.attachDevice(disk.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def detach_disk(name, disk):
Guba Sándor committed
567
    """ Detach disk from a running virtual machine. """
568 569 570
    domain = lookupByName(name)
    disk = VMDisk.deserialize(disk)
    domain.detachDevice(disk.dump_xml())
Guba Sándor committed
571 572 573 574 575 576 577 578 579 580 581 582 583
    # Libvirt does NOT report failed detach so test it.
    __check_detach(domain, disk.source)


def __check_detach(domain, disk):
    """ Test if detach was successfull by searching
    for disk in the XML"""
    xml = domain.XMLDesc()
    root = ET.fromstring(xml)
    devices = root.find('devices')
    for d in devices.findall("disk"):
        if disk in d.find('source').attrib.values()[0]:
            raise Exception("Disk could not been detached. "
Guba Sándor committed
584 585
                            "Check if hot plug support is "
                            "enabled (acpiphp module on Linux).")
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607


@celery.task
@req_connection
@wrap_libvirtError
def attach_network(name, net):
    domain = lookupByName(name)
    net = VMNetwork.deserialize(net)
    logging.error(net.dump_xml())
    domain.attachDevice(net.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def detach_network(name, net):
    domain = lookupByName(name)
    net = VMNetwork.deserialize(net)
    domain.detachDevice(net.dump_xml())


@celery.task
Guba Sándor committed
608 609 610 611
@req_connection
@wrap_libvirtError
def resize_disk(name, path, size):
    domain = lookupByName(name)
612 613 614
    # domain.blockResize(path, int(size),
    #                    flags=libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES)
    # To be compatible with libvirt < 0.9.11
615
    domain.blockResize(path, int(size)/1024, 0)
Guba Sándor committed
616 617 618


@celery.task
Őry Máté committed
619 620
def ping():
    return True
621 622 623


@celery.task
624 625 626 627 628 629 630 631 632
@req_connection
@wrap_libvirtError
def get_architecture():
    xml = Connection.get().getCapabilities()
    return ET.fromstring(xml).getchildren()[0].getchildren(
    )[1].getchildren()[0].text


@celery.task
633 634 635
def get_core_num():
    return NUM_CPUS

Őry Máté committed
636

637 638 639
@celery.task
def get_ram_size():
    return virtual_memory().total
640 641 642


@celery.task
643 644
def get_driver_version():
    from git import Repo
645 646
    try:
        repo = Repo(path=os.getcwd())
Kálmán Viktor committed
647 648 649
        lc = repo.head.commit
        return {'branch': repo.active_branch.name,
                'commit': lc.hexsha,
650
                'commit_text': lc.summary,
Kálmán Viktor committed
651 652 653
                'is_dirty': repo.is_dirty()}
    except Exception as e:
        logging.exception("Unhandled exception: %s", e)
654
        return None
655 656 657


@celery.task
658 659 660
def get_info():
    return {'core_num': get_core_num(),
            'ram_size': get_ram_size(),
661 662
            'architecture': get_architecture(),
            'driver_version': get_driver_version()}
663 664 665


@celery.task
666 667 668
def get_node_metrics():
    result = {}
    result['cpu.usage'] = cpu_percent(0)
669
    result['memory.usage'] = virtual_memory().percent
670
    return result