Commit 3b828db2 by Bach Dániel

handle responses from agent

parent c2d8f7e5
...@@ -5,7 +5,9 @@ import pickle ...@@ -5,7 +5,9 @@ import pickle
import logging import logging
import time import time
import struct import struct
from threading import Event
from os import getenv from os import getenv
from celery.result import TimeoutError
from utils import SerialLineReceiverBase from utils import SerialLineReceiverBase
...@@ -62,8 +64,9 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -62,8 +64,9 @@ class SerialLineReceiver(SerialLineReceiverBase):
agent_stopped.apply_async(queue='localhost.man', agent_stopped.apply_async(queue='localhost.man',
args=(self.factory.vm, )) args=(self.factory.vm, ))
if command == 'agent_started': if command == 'agent_started':
version = args.get('version', None)
agent_started.apply_async(queue='localhost.man', agent_started.apply_async(queue='localhost.man',
args=(self.factory.vm, )) args=(self.factory.vm, version))
if command == 'ping': if command == 'ping':
self.send_response(response='pong', self.send_response(response='pong',
args=args) args=args)
...@@ -71,6 +74,14 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -71,6 +74,14 @@ class SerialLineReceiver(SerialLineReceiverBase):
def handle_response(self, response, args): def handle_response(self, response, args):
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
else:
uuid = args.get('uuid', None)
if not uuid:
return
event = self.factory.running_tasks.get(uuid, None)
if event:
self.factory.ended_tasks[uuid] = args
event.set()
def connectionMade(self): def connectionMade(self):
logger.info("connected to %s" % self.factory.vm) logger.info("connected to %s" % self.factory.vm)
...@@ -80,9 +91,34 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -80,9 +91,34 @@ class SerialLineReceiver(SerialLineReceiverBase):
logger.info("disconnected from %s" % self.factory.vm) logger.info("disconnected from %s" % self.factory.vm)
del reactor.connections[self.factory.vm] del reactor.connections[self.factory.vm]
def send_command(self, command, args, timeout=10.0, uuid=None):
if not uuid:
super(SerialLineReceiver, self).send_command(command, args)
return
event = Event()
args['uuid'] = uuid
self.factory.running_tasks[uuid] = event
self.factory.ended_tasks[uuid] = None
super(SerialLineReceiver, self).send_command(command, args)
success = event.wait(timeout)
retval = self.factory.ended_tasks[uuid]
del self.factory.ended_tasks[uuid]
del self.factory.running_tasks[uuid]
if not success:
raise TimeoutError()
return retval
class SerialLineReceiverFactory(protocol.ClientFactory): class SerialLineReceiverFactory(protocol.ClientFactory):
protocol = SerialLineReceiver protocol = SerialLineReceiver
def __init__(self, vm): def __init__(self, vm):
self.vm = vm self.vm = vm
self.running_tasks = {}
self.ended_tasks = {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment