vmdriver.py 15.6 KB
Newer Older
1
""" Driver for libvirt. """
tarokkk committed
2 3
import libvirt
import logging
4
import os
user committed
5
import sys
Guba Sándor committed
6 7
import socket
import json
8
from decorator import decorator
9
import lxml.etree as ET
10

11
from psutil import NUM_CPUS, virtual_memory, cpu_percent
12

13
from celery.contrib.abortable import AbortableTask
14

15
from vm import VMInstance, VMDisk, VMNetwork
16
from vmcelery import celery, lib_connection, to_bool
tarokkk committed
17

user committed
18 19
sys.path.append(os.path.dirname(os.path.basename(__file__)))

20
vm_xml_dump = None
tarokkk committed
21

Guba Sándor committed
22 23 24 25 26 27 28 29 30 31
state_dict = {0: 'NOSTATE',
              1: 'RUNNING',
              2: 'BLOCKED',
              3: 'PAUSED',
              4: 'SHUTDOWN',
              5: 'SHUTOFF',
              6: 'CRASHED',
              7: 'PMSUSPENDED'
              }

tarokkk committed
32

33
# class Singleton(type):
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
#
#    """ Singleton class."""
#
#    _instances = {}
#
#    def __call__(cls, *args, **kwargs):
#        if cls not in cls._instances:
#            cls._instances[cls] = super(Singleton, cls).__call__(*args,
#                                                                 **kwargs)
#        return cls._instances[cls]


class Connection(object):

    """ Singleton class to handle connection."""

#    __metaclass__ = Singleton
    connection = None

    @classmethod
    def get(cls):
        """ Return the libvirt connection."""

        return cls.connection

    @classmethod
    def set(cls, connection):
        """ Set the libvirt connection."""

        cls.connection = connection


66 67
@decorator
def req_connection(original_function, *args, **kw):
68 69 70 71 72 73
    """Connection checking decorator for libvirt.

    If envrionment variable LIBVIRT_KEEPALIVE is set
    it will use the connection from the celery worker.

    Return the decorateed function
74

75
    """
76
    logging.debug("Decorator running")
77
    if Connection.get() is None:
78 79 80 81 82 83 84 85 86 87 88 89 90
        connect()
        try:
            logging.debug("Decorator calling original function")
            return_value = original_function(*args, **kw)
        finally:
            logging.debug("Finally part of decorator")
            disconnect()
        return return_value
    else:
        logging.debug("Decorator calling original \
                        function with active connection")
        return_value = original_function(*args, **kw)
        return return_value
tarokkk committed
91 92


Guba Sándor committed
93 94
@decorator
def wrap_libvirtError(original_function, *args, **kw):
95 96 97 98 99
    """ Decorator to wrap libvirt error in simple Exception.

    Return decorated function

    """
Guba Sándor committed
100
    try:
101
        return original_function(*args, **kw)
Guba Sándor committed
102
    except libvirt.libvirtError as e:
103
        logging.error(e.get_error_message())
104 105 106 107
        e_msg = e.get_error_message()
        if vm_xml_dump is not None:
            e_msg += "\n"
            e_msg += vm_xml_dump
Guba Sándor committed
108 109 110 111 112 113
        new_e = Exception(e.get_error_message())
        new_e.libvirtError = True
        raise new_e


@wrap_libvirtError
tarokkk committed
114
def connect(connection_string='qemu:///system'):
115 116 117 118 119 120
    """ Connect to the libvirt daemon.

    String is specified in the connection_string parameter
    the default is the local root.

    """
121
    if not to_bool(os.getenv('LIBVIRT_KEEPALIVE', "False")):
122 123
        if Connection.get() is None:
            Connection.set(libvirt.open(connection_string))
124 125 126
            logging.debug("Connection estabilished to libvirt.")
        else:
            logging.debug("There is already an active connection to libvirt.")
tarokkk committed
127
    else:
128
        Connection.set(lib_connection)
Guba Sándor committed
129
        logging.debug("Using celery libvirt connection connection.")
tarokkk committed
130

tarokkk committed
131

Guba Sándor committed
132
@wrap_libvirtError
tarokkk committed
133
def disconnect():
134
    """ Disconnect from the active libvirt daemon connection."""
135
    if os.getenv('LIBVIRT_KEEPALIVE') is None:
136
        if Connection.get() is None:
137 138
            logging.debug('There is no available libvirt conection.')
        else:
139
            Connection.get().close()
140
            logging.debug('Connection closed to libvirt.')
141
            Connection.set(None)
tarokkk committed
142
    else:
143
        logging.debug('Keepalive connection should not close.')
tarokkk committed
144 145


Guba Sándor committed
146
@celery.task
tarokkk committed
147
@req_connection
Guba Sándor committed
148
@wrap_libvirtError
tarokkk committed
149
def define(vm):
150 151
    """ Define permanent virtual machine from xml. """
    Connection.get().defineXML(vm.dump_xml())
tarokkk committed
152 153 154
    logging.info("Virtual machine %s is defined from xml", vm.name)


Guba Sándor committed
155
@celery.task
tarokkk committed
156
@req_connection
Guba Sándor committed
157
@wrap_libvirtError
158
def create(vm_desc):
159 160 161
    """ Create and start non-permanent virtual machine from xml.

    Return the domain info dict.
tarokkk committed
162 163 164 165 166 167
    flags can be:
        VIR_DOMAIN_NONE = 0
        VIR_DOMAIN_START_PAUSED = 1
        VIR_DOMAIN_START_AUTODESTROY = 2
        VIR_DOMAIN_START_BYPASS_CACHE = 4
        VIR_DOMAIN_START_FORCE_BOOT = 8
168 169

    """
170 171
    vm = VMInstance.deserialize(vm_desc)
    # Setting proper hypervisor
172
    vm.vm_type = os.getenv("HYPERVISOR_TYPE", "test")
173 174
    if vm.vm_type == "test":
        vm.arch = "i686"
175 176
    vm_xml_dump = vm.dump_xml()
    logging.info(vm_xml_dump)
177 178
    # Emulating DOMAIN_START_PAUSED FLAG behaviour on test driver
    if vm.vm_type == "test":
179
        Connection.get().createXML(
180
            vm_xml_dump, libvirt.VIR_DOMAIN_NONE)
181
        domain = lookupByName(vm.name)
182
        domain.suspend()
Guba Sándor committed
183
    # Real driver create
184
    else:
185
        Connection.get().createXML(
186
            vm_xml_dump, libvirt.VIR_DOMAIN_START_PAUSED)
187
        logging.info("Virtual machine %s is created from xml", vm.name)
Guba Sándor committed
188 189 190 191 192 193 194 195 196
    # context
    try:
        sock = socket.create_connection(('127.0.0.1', 1235), 3)
        data = {'boot_token': vm.boot_token,
                'socket': '/var/lib/libvirt/serial/%s' % vm.name}
        sock.sendall(json.dumps(data))
        sock.close()
    except socket.error:
        logging.error('Unable to connect to context server')
197
    return vm_xml_dump
tarokkk committed
198 199


200
class shutdown(AbortableTask):
201
    """ Shutdown virtual machine (need ACPI support).
202 203 204
    Return When domain is missiing.
    This job is abortable:
        AbortableAsyncResult(id="<<jobid>>").abort()
205
    """
206 207 208
    time_limit = 120

    @req_connection
209
    def run(self, args):
210
        from time import sleep
211
        name, = args
212 213 214 215 216 217 218 219 220 221 222
        try:
            domain = lookupByName(name)
            domain.shutdown()
            while True:
                try:
                    Connection.get().lookupByName(name)
                except libvirt.libvirtError as e:
                    if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
                        return
                    else:
                        raise
223
                else:
224
                    if self.is_aborted():
225 226 227 228 229 230 231
                        logging.info("Shutdown aborted on vm: %s", name)
                        return
                    sleep(5)
        except libvirt.libvirtError as e:
            new_e = Exception(e.get_error_message())
            new_e.libvirtError = True
            raise new_e
232 233 234 235


@celery.task
@req_connection
Guba Sándor committed
236
@wrap_libvirtError
tarokkk committed
237
def delete(name):
238
    """ Destroy the running called 'name' virtual machine. """
tarokkk committed
239 240 241 242
    domain = lookupByName(name)
    domain.destroy()


Guba Sándor committed
243
@celery.task
tarokkk committed
244
@req_connection
Guba Sándor committed
245
@wrap_libvirtError
tarokkk committed
246
def list_domains():
247 248 249 250 251
    """ List the running domains.

    :return list: List of domains name in host.

    """
252
    domain_list = []
253 254
    for i in Connection.get().listDomainsID():
        dom = Connection.get().lookupByID(i)
255
        domain_list.append(dom.name())
256
    return domain_list
tarokkk committed
257 258


Guba Sándor committed
259
@celery.task
tarokkk committed
260
@req_connection
Guba Sándor committed
261
@wrap_libvirtError
262 263 264 265 266 267 268 269 270
def list_domains_info():
    """ List the running domains.

    :return list: List of domains info dict.

    """
    domain_list = []
    for i in Connection.get().listDomainsID():
        dom = Connection.get().lookupByID(i)
271 272 273
        domain_dict = _parse_info(dom.info())
        domain_dict['name'] = dom.name()
        domain_list.append(domain_dict)
274 275 276 277 278 279
    return domain_list


@celery.task
@req_connection
@wrap_libvirtError
tarokkk committed
280
def lookupByName(name):
281 282
    """ Return with the requested Domain. """
    return Connection.get().lookupByName(name)
tarokkk committed
283 284


Guba Sándor committed
285
@celery.task
tarokkk committed
286
@req_connection
Guba Sándor committed
287
@wrap_libvirtError
tarokkk committed
288
def undefine(name):
289 290 291 292 293
    """ Undefine an already defined virtual machine.

    If it's running it becomes transient (lost on reboot)

    """
tarokkk committed
294 295 296 297
    domain = lookupByName(name)
    domain.undefine()


Guba Sándor committed
298
@celery.task
tarokkk committed
299
@req_connection
Guba Sándor committed
300
@wrap_libvirtError
301
def start(name):
302 303
    """ Start an already defined virtual machine."""

304
    domain = lookupByName(name)
tarokkk committed
305 306 307
    domain.create()


Guba Sándor committed
308
@celery.task
tarokkk committed
309
@req_connection
Guba Sándor committed
310
@wrap_libvirtError
311
def suspend(name):
312 313 314 315 316 317
    """ Stop virtual machine and keep memory in RAM.

    Return the domain info dict.

    """

318 319
    domain = lookupByName(name)
    domain.suspend()
320
    return _parse_info(domain.info())
321 322 323 324


@celery.task
@req_connection
Guba Sándor committed
325
@wrap_libvirtError
326
def save(name, path):
327 328
    """ Stop virtual machine and save its memory to path. """

329
    domain = lookupByName(name)
tarokkk committed
330 331 332
    domain.save(path)


Guba Sándor committed
333
@celery.task
tarokkk committed
334
@req_connection
Guba Sándor committed
335
@wrap_libvirtError
336
def restore(name, path):
337 338 339 340 341 342 343 344
    """ Restore a saved virtual machine.

    Restores the virtual machine from the memory image
    stored at path.
    Return the domain info dict.

    """
    Connection.get().restore(path)
345
    return domain_info(name)
Guba Sándor committed
346 347


Guba Sándor committed
348
@celery.task
Guba Sándor committed
349
@req_connection
Guba Sándor committed
350
@wrap_libvirtError
351
def resume(name):
352 353 354 355 356 357
    """ Resume stopped virtual machines.

    Return the domain info dict.

    """

358
    domain = lookupByName(name)
tarokkk committed
359
    domain.resume()
360
    return _parse_info(domain.info())
tarokkk committed
361 362


Guba Sándor committed
363
@celery.task
tarokkk committed
364
@req_connection
Guba Sándor committed
365
@wrap_libvirtError
366
def reset(name):
367 368 369 370 371 372
    """ Reset (power reset) virtual machine.

    Return the domain info dict.

    """

373
    domain = lookupByName(name)
374
    domain.reset(0)
375
    return _parse_info(domain.info())
tarokkk committed
376 377


Guba Sándor committed
378
@celery.task
tarokkk committed
379
@req_connection
Guba Sándor committed
380
@wrap_libvirtError
381
def reboot(name):
382 383 384 385 386
    """ Reboot (with guest acpi support) virtual machine.

    Return the domain info dict.

    """
387
    domain = lookupByName(name)
388
    domain.reboot(0)
389
    return _parse_info(domain.info())
390 391


Guba Sándor committed
392
@celery.task
393
@req_connection
Guba Sándor committed
394
@wrap_libvirtError
395
def node_info():
396 397 398 399
    """ Get info from Host as dict.

    Return dict:

400 401 402 403 404 405 406 407 408 409 410 411
    model   string indicating the CPU model
    memory  memory size in kilobytes
    cpus    the number of active CPUs
    mhz     expected CPU frequency
    nodes    the number of NUMA cell, 1 for unusual NUMA
             topologies or uniform memory access;
             check capabilities XML for the actual NUMA topology
    sockets  number of CPU sockets per node if nodes > 1,
             1 in case of unusual NUMA topology
    cores    number of cores per socket, total number of
             processors in case of unusual NUMA topolog
    threads  number of threads per core, 1 in case of unusual numa topology
412 413 414

    """

415 416
    keys = ['model', 'memory', 'cpus', 'mhz',
            'nodes', 'sockets', 'cores', 'threads']
417
    values = Connection.get().getInfo()
418 419 420
    return dict(zip(keys, values))


421
def _parse_info(values):
422 423 424 425 426 427
    """ Parse libvirt domain info into dict.

    Return the info dict.

    """

428 429 430 431 432 433 434
    keys = ['state', 'maxmem', 'memory', 'virtcpunum', 'cputime']
    info = dict(zip(keys, values))
    # Change state to proper ENUM
    info['state'] = state_dict[info['state']]
    return info


Guba Sándor committed
435
@celery.task
436
@req_connection
Guba Sándor committed
437
@wrap_libvirtError
438
def domain_info(name):
439 440 441
    """ Get the domain info from libvirt.

    Return the domain info dict:
442 443 444 445 446
    state   the running state, one of virDomainState
    maxmem  the maximum memory in KBytes allowed
    memory  the memory in KBytes used by the domain
    virtcpunum    the number of virtual CPUs for the domain
    cputime    the CPU time used in nanoseconds
447 448

    """
449
    dom = lookupByName(name)
450
    return _parse_info(dom.info())
451 452


Guba Sándor committed
453
@celery.task
454
@req_connection
Guba Sándor committed
455
@wrap_libvirtError
456
def network_info(name, network):
457 458
    """ Return the network info dict.

459 460 461 462 463 464 465 466
    rx_bytes
    rx_packets
    rx_errs
    rx_drop
    tx_bytes
    tx_packets
    tx_errs
    tx_drop
467 468

    """
469 470 471 472 473 474 475 476
    keys = ['rx_bytes', 'rx_packets', 'rx_errs', 'rx_drop',
            'tx_bytes', 'tx_packets', 'tx_errs', 'tx_drop']
    dom = lookupByName(name)
    values = dom.interfaceStats(network)
    info = dict(zip(keys, values))
    return info


Guba Sándor committed
477
@celery.task
478
@req_connection
Guba Sándor committed
479
@wrap_libvirtError
480
def send_key(name, key_code):
481 482 483 484 485 486
    """ Sending linux key_code to the name vm.

    key_code can be optained from linux_keys.py
    e.x: linuxkeys.KEY_RIGHTCTRL

    """
487 488 489 490 491
    domain = lookupByName(name)
    domain.sendKey(libvirt.VIR_KEYCODE_SET_LINUX, 100, [key_code], 1, 0)


def _stream_handler(stream, buf, opaque):
492
    opaque.write(buf)
493 494


Guba Sándor committed
495
@celery.task
496
@req_connection
Guba Sándor committed
497
@wrap_libvirtError
498
def screenshot(name):
499
    """Save screenshot of virtual machine.
500
    Returns a ByteIO object that contains the screenshot in png format.
501
    """
502 503
    from io import BytesIO
    from PIL import Image
504 505 506 507 508 509 510 511
    # Import linuxkeys to get defines
    import linuxkeys
    # Connection need for the stream object
    domain = lookupByName(name)
    # Send key to wake up console
    domain.sendKey(libvirt.VIR_KEYCODE_SET_LINUX,
                   100, [linuxkeys.KEY_RIGHTCTRL], 1, 0)
    # Create Stream to get data
512
    stream = Connection.get().newStream(0)
513 514
    # Take screenshot accessible by stream (return mimetype)
    domain.screenshot(stream, 0, 0)
515
    # Get file to save data (send on AMQP?)
516
    fd = BytesIO()
517 518 519 520 521
    try:
        # Save data with handler
        stream.recvAll(_stream_handler, fd)
    finally:
        stream.finish()
522 523
    # Convert ppm to png
    # Seek to the beginning of the stream
524 525 526 527 528 529
    fd.seek(0)
    # Get the image
    image = BytesIO()
    ppm = Image.open(fd)
    ppm.save(image, format='PNG')
    return image
530 531


Guba Sándor committed
532
@celery.task
533
@req_connection
Guba Sándor committed
534
@wrap_libvirtError
535
def migrate(name, host, live=False):
536
    """ Migrate domain to host. """
537 538 539 540 541 542 543 544 545
    flags = libvirt.VIR_MIGRATE_PEER2PEER
    if live:
        flags = flags | libvirt.VIR_MIGRATE_LIVE
    domain = lookupByName(name)
    domain.migrateToURI(
        duri="qemu+tcp://" + host + "/system",
        flags=flags,
        dname=name,
        bandwidth=0)
546
    # return _parse_info(domain.info())
Őry Máté committed
547

548

Őry Máté committed
549
@celery.task
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
@req_connection
@wrap_libvirtError
def attach_disk(name, disk):
    domain = lookupByName(name)
    disk = VMDisk.deserialize(disk)
    domain.attachDevice(disk.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def detach_disk(name, disk):
    domain = lookupByName(name)
    disk = VMDisk.deserialize(disk)
    domain.detachDevice(disk.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def attach_network(name, net):
    domain = lookupByName(name)
    net = VMNetwork.deserialize(net)
    logging.error(net.dump_xml())
    domain.attachDevice(net.dump_xml())


@celery.task
@req_connection
@wrap_libvirtError
def detach_network(name, net):
    domain = lookupByName(name)
    net = VMNetwork.deserialize(net)
    domain.detachDevice(net.dump_xml())


@celery.task
Guba Sándor committed
587 588 589 590
@req_connection
@wrap_libvirtError
def resize_disk(name, path, size):
    domain = lookupByName(name)
591 592 593
    # domain.blockResize(path, int(size),
    #                    flags=libvirt.VIR_DOMAIN_BLOCK_RESIZE_BYTES)
    # To be compatible with libvirt < 0.9.11
594
    domain.blockResize(path, int(size)/1024, 0)
Guba Sándor committed
595 596 597


@celery.task
Őry Máté committed
598 599
def ping():
    return True
600 601 602


@celery.task
603 604 605 606 607 608 609 610 611
@req_connection
@wrap_libvirtError
def get_architecture():
    xml = Connection.get().getCapabilities()
    return ET.fromstring(xml).getchildren()[0].getchildren(
    )[1].getchildren()[0].text


@celery.task
612 613 614
def get_core_num():
    return NUM_CPUS

Őry Máté committed
615

616 617 618
@celery.task
def get_ram_size():
    return virtual_memory().total
619 620 621


@celery.task
622 623 624 625 626 627 628 629 630 631 632
def get_driver_version():
    from git import Repo
    repo = Repo(path=os.getcwd())
    lc = repo.log()[0]
    return {'branch': repo.active_branch,
            'commit': lc.id_abbrev,
            'commit_text': lc.summary,
            'is_dirty': repo.is_dirty}


@celery.task
633 634 635
def get_info():
    return {'core_num': get_core_num(),
            'ram_size': get_ram_size(),
636 637
            'architecture': get_architecture(),
            'driver_version': get_driver_version()}
638 639 640


@celery.task
641 642 643
def get_node_metrics():
    result = {}
    result['cpu.usage'] = cpu_percent(0)
644
    result['memory.usage'] = virtual_memory().percent
645
    return result