Commit 0c2edef7 by Guba Sándor

rewrite tasks, add virtioserial support

parent d9e5655a
# from twisted.internet.defer import Deferred # from twisted.internet.defer import Deferred
from twisted.internet import reactor # threads from twisted.internet import reactor # threads
from celery.result import TimeoutError
from celery import Celery from celery import Celery
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
from socket import gethostname from socket import gethostname
from threading import Event
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
...@@ -21,98 +23,100 @@ celery.conf.update(CELERY_CACHE_BACKEND=CACHE_URI, ...@@ -21,98 +23,100 @@ celery.conf.update(CELERY_CACHE_BACKEND=CACHE_URI,
routing_key='agent'), )) routing_key='agent'), ))
def send_command(vm, command, *args, **kwargs):
uuid = kwargs.get('uuid', None)
timeout = kwargs.get('timeout', 10)
if uuid:
event = Event()
reactor.running_tasks[vm][uuid] = event
reactor.ended_tasks[vm][uuid] = None
for conn in reactor.connections[vm]:
logger.info('%s(%s, %s)', command, vm,
', '.join(map(lambda x: str(x)[:100], kwargs.values())))
conn.send_command(command=command, args=kwargs)
if uuid:
success = event.wait(timeout)
retval = reactor.ended_tasks[vm][uuid]
del reactor.ended_tasks[vm][uuid]
del reactor.running_tasks[vm][uuid]
if not success:
raise TimeoutError()
return retval
@celery.task(name='agent.change_password') @celery.task(name='agent.change_password')
def change_password(vm, password): def change_password(vm, password):
reactor.connections[vm].send_command(command='change_password', send_command(vm, command='change_password', password=password)
args={'password':
password})
logger.debug('change_password(%s,%s)', vm, password)
@celery.task(name='agent.set_hostname') @celery.task(name='agent.set_hostname')
def set_hostname(vm, hostname): def set_hostname(vm, hostname):
reactor.connections[vm].send_command(command='set_hostname', send_command(vm, command='set_hostname', hostname=hostname)
args={'hostname':
hostname})
logger.debug('set_hostname(%s,%s)', vm, hostname)
@celery.task(name='agent.restart_networking') @celery.task(name='agent.restart_networking')
def restart_networking(vm): def restart_networking(vm):
reactor.connections[vm].send_command(command='restart_networking', send_command(vm, command='restart_networking')
args={})
logger.debug('restart_networking(%s)', vm)
@celery.task(name='agent.set_time') @celery.task(name='agent.set_time')
def set_time(vm, time): def set_time(vm, time):
reactor.connections[vm].send_command(command='set_time', send_command(vm, command='set_time', time=time)
args={'time': time})
logger.debug('set_time(%s,%s)', vm, time)
@celery.task(name='agent.mount_store') @celery.task(name='agent.mount_store')
def mount_store(vm, host, username, password): def mount_store(vm, host, username, password):
reactor.connections[vm].send_command(command='mount_store', send_command(vm, command='mount_store', host=host,
args={'host': host, username=username, password=password)
'username': username,
'password': password})
logger.debug('mount_store(%s,%s,%s)', vm, host, username)
@celery.task(name='agent.cleanup') @celery.task(name='agent.cleanup')
def cleanup(vm): def cleanup(vm):
reactor.connections[vm].send_command(command='cleanup', args={}) send_command(vm, command='cleanup')
logger.debug('cleanup(%s)', vm)
@celery.task(name='agent.start_access_server') @celery.task(name='agent.start_access_server')
def start_access_server(vm): def start_access_server(vm):
reactor.connections[vm].send_command( send_command(vm, command='start_access_server')
command='start_access_server', args={})
logger.debug('start_access_server(%s)', vm)
@celery.task(name='agent.update') @celery.task(name='agent.update')
def update(vm, data): def update(vm, data, executable=None):
logger.debug('update(%s)', vm) kwargs = {'command': 'update', 'data': data, 'uuid': update.request.id}
return reactor.connections[vm].send_command( if executable is not None:
command='update', args={'data': data}, uuid=update.request.id) kwargs['executable'] = executable
return send_command(vm, **kwargs)
@celery.task(name='agent.add_keys') @celery.task(name='agent.add_keys')
def add_keys(vm, keys): def add_keys(vm, keys):
logger.debug('add_keys(%s, %s)', vm, keys) send_command(vm, command='add_keys', keys=keys)
reactor.connections[vm].send_command(
command='add_keys', args={'keys': keys})
@celery.task(name='agent.del_keys') @celery.task(name='agent.del_keys')
def del_keys(vm, keys): def del_keys(vm, keys):
logger.debug('del_keys(%s, %s)', vm, keys) send_command(vm, command='del_keys', keys=keys)
reactor.connections[vm].send_command(
command='del_keys', args={'keys': keys})
@celery.task(name='agent.get_keys') @celery.task(name='agent.get_keys')
def get_keys(vm): def get_keys(vm):
logger.debug('get_keys(%s)', vm) return send_command(vm, command='get_keys')
return reactor.connections[vm].send_command(
command='get_keys', args={}, uuid=get_keys.request.id)
@celery.task(name='agent.send_expiration') @celery.task(name='agent.send_expiration')
def send_expiration(vm, url): def send_expiration(vm, url):
logger.debug('send_expiration(%s, %s)', vm, url) return send_command(vm, command='send_expiration',
return reactor.connections[vm].send_command( url=url)
command='send_expiration', args={'url': url})
@celery.task(name='agent.change_ip') @celery.task(name='agent.change_ip')
def change_ip(vm, interfaces, dns): def change_ip(vm, interfaces, dns):
logger.debug('change_ip(%s, %s, %s)', vm, interfaces, dns) send_command(vm, command='change_ip', interfaces=interfaces, dns=dns)
return reactor.connections[vm].send_command(
command='change_ip', args={'interfaces': interfaces, 'dns': dns})
@celery.task(name='vm.tasks.local_agent_tasks.renew') @celery.task(name='vm.tasks.local_agent_tasks.renew')
......
...@@ -25,6 +25,8 @@ Worker.install_platform_tweaks = install_platform_tweaks ...@@ -25,6 +25,8 @@ Worker.install_platform_tweaks = install_platform_tweaks
def reactor_started(): def reactor_started():
reactor.running_tasks = {}
reactor.ended_tasks = {}
for f in listdir(SOCKET_DIR): for f in listdir(SOCKET_DIR):
f = path.join(SOCKET_DIR, f) f = path.join(SOCKET_DIR, f)
inotify_handler(None, filepath.FilePath(f), None) inotify_handler(None, filepath.FilePath(f), None)
......
...@@ -5,9 +5,7 @@ import pickle ...@@ -5,9 +5,7 @@ import pickle
import logging import logging
import time import time
import struct import struct
from threading import Event
from os import getenv from os import getenv
from celery.result import TimeoutError
from utils import SerialLineReceiverBase from utils import SerialLineReceiverBase
...@@ -42,9 +40,10 @@ class GraphiteClientFactory(protocol.ClientFactory): ...@@ -42,9 +40,10 @@ class GraphiteClientFactory(protocol.ClientFactory):
def inotify_handler(self, file, mask): def inotify_handler(self, file, mask):
vm = file.basename() vm = file.basename().replace('vio-', '')
logger.info('inotify: %s' % vm) logger.info('inotify: %s (%s)', vm, file.path)
if vm in reactor.connections: for conn in reactor.connections.get(vm, []):
if file.path == conn.transport.addr:
return return
serial = SerialLineReceiverFactory(vm) serial = SerialLineReceiverFactory(vm)
logger.info("connecting to %s (%s)", vm, file.path) logger.info("connecting to %s (%s)", vm, file.path)
...@@ -76,47 +75,29 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -76,47 +75,29 @@ class SerialLineReceiver(SerialLineReceiverBase):
args=args) args=args)
def handle_response(self, response, args): def handle_response(self, response, args):
vm = self.factory.vm
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
else: else:
uuid = args.get('uuid', None) uuid = args.get('uuid', None)
if not uuid: if not uuid:
return return
event = self.factory.running_tasks.get(uuid, None) event = reactor.running_tasks[vm].get(uuid, None)
if event: if event:
self.factory.ended_tasks[uuid] = args reactor.ended_tasks[vm][uuid] = args
event.set() event.set()
def connectionMade(self): def connectionMade(self):
logger.info("connected to %s" % self.factory.vm) logger.info("connected to %s (%s)", self.factory.vm,
reactor.connections[self.factory.vm] = self self.transport.addr)
if self.factory.vm not in reactor.connections:
reactor.connections[self.factory.vm] = set()
reactor.connections[self.factory.vm].add(self)
def connectionLost(self, reason): def connectionLost(self, reason):
logger.info("disconnected from %s" % self.factory.vm) logger.info("disconnected from %s (%s)", self.factory.vm,
del reactor.connections[self.factory.vm] self.transport.addr)
reactor.connections[self.factory.vm].remove(self)
def send_command(self, command, args, timeout=10.0, uuid=None):
if not uuid:
super(SerialLineReceiver, self).send_command(command, args)
return
event = Event()
args['uuid'] = uuid
self.factory.running_tasks[uuid] = event
self.factory.ended_tasks[uuid] = None
super(SerialLineReceiver, self).send_command(command, args)
success = event.wait(timeout)
retval = self.factory.ended_tasks[uuid]
del self.factory.ended_tasks[uuid]
del self.factory.running_tasks[uuid]
if not success:
raise TimeoutError()
return retval
class SerialLineReceiverFactory(protocol.ClientFactory): class SerialLineReceiverFactory(protocol.ClientFactory):
...@@ -124,5 +105,7 @@ class SerialLineReceiverFactory(protocol.ClientFactory): ...@@ -124,5 +105,7 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
def __init__(self, vm): def __init__(self, vm):
self.vm = vm self.vm = vm
self.running_tasks = {} if vm not in reactor.running_tasks:
self.ended_tasks = {} reactor.running_tasks[vm] = {}
if vm not in reactor.ended_tasks:
reactor.ended_tasks[vm] = {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment