Commit 89e2fcba by Bach Dániel

Merge branch 'virtio' into 'master'

Virtio

See merge request !1
parents 942d2c17 0c2edef7
# from twisted.internet.defer import Deferred # from twisted.internet.defer import Deferred
from twisted.internet import reactor # threads from twisted.internet import reactor # threads
from celery.result import TimeoutError
from celery import Celery from celery import Celery
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
from socket import gethostname from socket import gethostname
from threading import Event
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger()
HOSTNAME = gethostname() HOSTNAME = gethostname()
AMQP_URI = getenv('AMQP_URI') AMQP_URI = getenv('AMQP_URI')
...@@ -21,98 +23,100 @@ celery.conf.update(CELERY_CACHE_BACKEND=CACHE_URI, ...@@ -21,98 +23,100 @@ celery.conf.update(CELERY_CACHE_BACKEND=CACHE_URI,
routing_key='agent'), )) routing_key='agent'), ))
def send_command(vm, command, *args, **kwargs):
uuid = kwargs.get('uuid', None)
timeout = kwargs.get('timeout', 10)
if uuid:
event = Event()
reactor.running_tasks[vm][uuid] = event
reactor.ended_tasks[vm][uuid] = None
for conn in reactor.connections[vm]:
logger.info('%s(%s, %s)', command, vm,
', '.join(map(lambda x: str(x)[:100], kwargs.values())))
conn.send_command(command=command, args=kwargs)
if uuid:
success = event.wait(timeout)
retval = reactor.ended_tasks[vm][uuid]
del reactor.ended_tasks[vm][uuid]
del reactor.running_tasks[vm][uuid]
if not success:
raise TimeoutError()
return retval
@celery.task(name='agent.change_password') @celery.task(name='agent.change_password')
def change_password(vm, password): def change_password(vm, password):
reactor.connections[vm].send_command(command='change_password', send_command(vm, command='change_password', password=password)
args={'password':
password})
logger.debug('change_password(%s,%s)', vm, password)
@celery.task(name='agent.set_hostname') @celery.task(name='agent.set_hostname')
def set_hostname(vm, hostname): def set_hostname(vm, hostname):
reactor.connections[vm].send_command(command='set_hostname', send_command(vm, command='set_hostname', hostname=hostname)
args={'hostname':
hostname})
logger.debug('set_hostname(%s,%s)', vm, hostname)
@celery.task(name='agent.restart_networking') @celery.task(name='agent.restart_networking')
def restart_networking(vm): def restart_networking(vm):
reactor.connections[vm].send_command(command='restart_networking', send_command(vm, command='restart_networking')
args={})
logger.debug('restart_networking(%s)', vm)
@celery.task(name='agent.set_time') @celery.task(name='agent.set_time')
def set_time(vm, time): def set_time(vm, time):
reactor.connections[vm].send_command(command='set_time', send_command(vm, command='set_time', time=time)
args={'time': time})
logger.debug('set_time(%s,%s)', vm, time)
@celery.task(name='agent.mount_store') @celery.task(name='agent.mount_store')
def mount_store(vm, host, username, password): def mount_store(vm, host, username, password):
reactor.connections[vm].send_command(command='mount_store', send_command(vm, command='mount_store', host=host,
args={'host': host, username=username, password=password)
'username': username,
'password': password})
logger.debug('mount_store(%s,%s,%s)', vm, host, username)
@celery.task(name='agent.cleanup') @celery.task(name='agent.cleanup')
def cleanup(vm): def cleanup(vm):
reactor.connections[vm].send_command(command='cleanup', args={}) send_command(vm, command='cleanup')
logger.debug('cleanup(%s)', vm)
@celery.task(name='agent.start_access_server') @celery.task(name='agent.start_access_server')
def start_access_server(vm): def start_access_server(vm):
reactor.connections[vm].send_command( send_command(vm, command='start_access_server')
command='start_access_server', args={})
logger.debug('start_access_server(%s)', vm)
@celery.task(name='agent.update') @celery.task(name='agent.update')
def update(vm, data): def update(vm, data, executable=None):
logger.debug('update(%s)', vm) kwargs = {'command': 'update', 'data': data, 'uuid': update.request.id}
return reactor.connections[vm].send_command( if executable is not None:
command='update', args={'data': data}, uuid=update.request.id) kwargs['executable'] = executable
return send_command(vm, **kwargs)
@celery.task(name='agent.add_keys') @celery.task(name='agent.add_keys')
def add_keys(vm, keys): def add_keys(vm, keys):
logger.debug('add_keys(%s, %s)', vm, keys) send_command(vm, command='add_keys', keys=keys)
reactor.connections[vm].send_command(
command='add_keys', args={'keys': keys})
@celery.task(name='agent.del_keys') @celery.task(name='agent.del_keys')
def del_keys(vm, keys): def del_keys(vm, keys):
logger.debug('del_keys(%s, %s)', vm, keys) send_command(vm, command='del_keys', keys=keys)
reactor.connections[vm].send_command(
command='del_keys', args={'keys': keys})
@celery.task(name='agent.get_keys') @celery.task(name='agent.get_keys')
def get_keys(vm): def get_keys(vm):
logger.debug('get_keys(%s)', vm) return send_command(vm, command='get_keys')
return reactor.connections[vm].send_command(
command='get_keys', args={}, uuid=get_keys.request.id)
@celery.task(name='agent.send_expiration') @celery.task(name='agent.send_expiration')
def send_expiration(vm, url): def send_expiration(vm, url):
logger.debug('send_expiration(%s, %s)', vm, url) return send_command(vm, command='send_expiration',
return reactor.connections[vm].send_command( url=url)
command='send_expiration', args={'url': url})
@celery.task(name='agent.change_ip') @celery.task(name='agent.change_ip')
def change_ip(vm, interfaces, dns): def change_ip(vm, interfaces, dns):
logger.debug('change_ip(%s, %s, %s)', vm, interfaces, dns) send_command(vm, command='change_ip', interfaces=interfaces, dns=dns)
return reactor.connections[vm].send_command(
command='change_ip', args={'interfaces': interfaces, 'dns': dns})
@celery.task(name='vm.tasks.local_agent_tasks.renew') @celery.task(name='vm.tasks.local_agent_tasks.renew')
......
...@@ -3,9 +3,14 @@ from twisted.internet import reactor, inotify ...@@ -3,9 +3,14 @@ from twisted.internet import reactor, inotify
from twisted.python import filepath from twisted.python import filepath
from agentcelery import celery, HOSTNAME from agentcelery import celery, HOSTNAME
from protocol import inotify_handler from protocol import inotify_handler
from os import getenv, listdir, path from os import getenv, listdir, path, environ
import logging import logging
logging.basicConfig()
logger = logging.getLogger()
level = environ.get('LOGLEVEL', 'INFO')
logger.setLevel(level)
SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial') SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial')
...@@ -20,6 +25,8 @@ Worker.install_platform_tweaks = install_platform_tweaks ...@@ -20,6 +25,8 @@ Worker.install_platform_tweaks = install_platform_tweaks
def reactor_started(): def reactor_started():
reactor.running_tasks = {}
reactor.ended_tasks = {}
for f in listdir(SOCKET_DIR): for f in listdir(SOCKET_DIR):
f = path.join(SOCKET_DIR, f) f = path.join(SOCKET_DIR, f)
inotify_handler(None, filepath.FilePath(f), None) inotify_handler(None, filepath.FilePath(f), None)
...@@ -33,7 +40,7 @@ def main(): ...@@ -33,7 +40,7 @@ def main():
w = Worker(app=celery, concurrency=1, w = Worker(app=celery, concurrency=1,
pool_cls='threads', pool_cls='threads',
hostname=HOSTNAME + '.agentdriver', hostname=HOSTNAME + '.agentdriver',
loglevel=logging.DEBUG) loglevel=level)
reactor.callInThread(w.run) reactor.callInThread(w.run)
notifier = inotify.INotify(reactor) notifier = inotify.INotify(reactor)
notifier.startReading() notifier.startReading()
......
...@@ -5,15 +5,13 @@ import pickle ...@@ -5,15 +5,13 @@ import pickle
import logging import logging
import time import time
import struct import struct
from threading import Event
from os import getenv from os import getenv
from celery.result import TimeoutError
from utils import SerialLineReceiverBase from utils import SerialLineReceiverBase
from agentcelery import agent_started, agent_stopped, renew from agentcelery import agent_started, agent_stopped, renew
logger = logging.getLogger(__name__) logger = logging.getLogger()
reactor.connections = {} reactor.connections = {}
...@@ -42,11 +40,13 @@ class GraphiteClientFactory(protocol.ClientFactory): ...@@ -42,11 +40,13 @@ class GraphiteClientFactory(protocol.ClientFactory):
def inotify_handler(self, file, mask): def inotify_handler(self, file, mask):
vm = file.basename() vm = file.basename().replace('vio-', '')
logger.info('inotify: %s' % vm) logger.info('inotify: %s (%s)', vm, file.path)
if vm in reactor.connections: for conn in reactor.connections.get(vm, []):
return if file.path == conn.transport.addr:
return
serial = SerialLineReceiverFactory(vm) serial = SerialLineReceiverFactory(vm)
logger.info("connecting to %s (%s)", vm, file.path)
reactor.connectUNIX(file.path, serial) reactor.connectUNIX(file.path, serial)
...@@ -75,47 +75,29 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -75,47 +75,29 @@ class SerialLineReceiver(SerialLineReceiverBase):
args=args) args=args)
def handle_response(self, response, args): def handle_response(self, response, args):
vm = self.factory.vm
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
else: else:
uuid = args.get('uuid', None) uuid = args.get('uuid', None)
if not uuid: if not uuid:
return return
event = self.factory.running_tasks.get(uuid, None) event = reactor.running_tasks[vm].get(uuid, None)
if event: if event:
self.factory.ended_tasks[uuid] = args reactor.ended_tasks[vm][uuid] = args
event.set() event.set()
def connectionMade(self): def connectionMade(self):
logger.info("connected to %s" % self.factory.vm) logger.info("connected to %s (%s)", self.factory.vm,
reactor.connections[self.factory.vm] = self self.transport.addr)
if self.factory.vm not in reactor.connections:
reactor.connections[self.factory.vm] = set()
reactor.connections[self.factory.vm].add(self)
def connectionLost(self, reason): def connectionLost(self, reason):
logger.info("disconnected from %s" % self.factory.vm) logger.info("disconnected from %s (%s)", self.factory.vm,
del reactor.connections[self.factory.vm] self.transport.addr)
reactor.connections[self.factory.vm].remove(self)
def send_command(self, command, args, timeout=10.0, uuid=None):
if not uuid:
super(SerialLineReceiver, self).send_command(command, args)
return
event = Event()
args['uuid'] = uuid
self.factory.running_tasks[uuid] = event
self.factory.ended_tasks[uuid] = None
super(SerialLineReceiver, self).send_command(command, args)
success = event.wait(timeout)
retval = self.factory.ended_tasks[uuid]
del self.factory.ended_tasks[uuid]
del self.factory.running_tasks[uuid]
if not success:
raise TimeoutError()
return retval
class SerialLineReceiverFactory(protocol.ClientFactory): class SerialLineReceiverFactory(protocol.ClientFactory):
...@@ -123,5 +105,7 @@ class SerialLineReceiverFactory(protocol.ClientFactory): ...@@ -123,5 +105,7 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
def __init__(self, vm): def __init__(self, vm):
self.vm = vm self.vm = vm
self.running_tasks = {} if vm not in reactor.running_tasks:
self.ended_tasks = {} reactor.running_tasks[vm] = {}
if vm not in reactor.ended_tasks:
reactor.ended_tasks[vm] = {}
...@@ -2,8 +2,7 @@ from twisted.protocols.basic import LineReceiver ...@@ -2,8 +2,7 @@ from twisted.protocols.basic import LineReceiver
import json import json
import logging import logging
root_logger = logging.getLogger() logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
class SerialLineReceiverBase(LineReceiver, object): class SerialLineReceiverBase(LineReceiver, object):
...@@ -31,14 +30,14 @@ class SerialLineReceiverBase(LineReceiver, object): ...@@ -31,14 +30,14 @@ class SerialLineReceiverBase(LineReceiver, object):
args = {} args = {}
command = data.get('command', None) command = data.get('command', None)
response = data.get('response', None) response = data.get('response', None)
logging.debug('[serial] valid json: %s' % (data, )) logger.debug('[serial] valid json: %s' % (data, ))
except (ValueError, KeyError) as e: except (ValueError, KeyError) as e:
logging.error('[serial] invalid json: %s (%s)' % (data, e)) logger.error('[serial] invalid json: %s (%s)' % (data, e))
return return
if command is not None and isinstance(command, unicode): if command is not None and isinstance(command, unicode):
logging.debug('received command: %s (%s)' % (command, args)) logger.debug('received command: %s (%s)' % (command, args))
self.handle_command(command, args) self.handle_command(command, args)
elif response is not None and isinstance(response, unicode): elif response is not None and isinstance(response, unicode):
logging.debug('received reply: %s (%s)' % (response, args)) logger.debug('received reply: %s (%s)' % (response, args))
self.handle_response(response, args) self.handle_response(response, args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment