Commit 0a1798be by Szeberényi Imre

python 3.6 on mega7

parent 859ecd46
...@@ -14,8 +14,9 @@ HOSTNAME = gethostname().split('.')[0] ...@@ -14,8 +14,9 @@ HOSTNAME = gethostname().split('.')[0]
AMQP_URI = getenv('AMQP_URI') AMQP_URI = getenv('AMQP_URI')
celery = Celery('agent', broker=AMQP_URI) celery = Celery('agent', broker=AMQP_URI)
celery.conf.update(CELERY_RESULT_BACKEND='amqp://', celery.conf.update(CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=30000, CELERY_TASK_RESULT_EXPIRES=300,
CELERYD_PREFETCH_MULTIPLIER=32,
CELERY_QUEUES=(Queue(HOSTNAME + '.agent', CELERY_QUEUES=(Queue(HOSTNAME + '.agent',
Exchange('agent', type='direct'), Exchange('agent', type='direct'),
routing_key='agent'), )) routing_key='agent'), ))
...@@ -33,7 +34,7 @@ def send_command(vm, command, *args, **kwargs): ...@@ -33,7 +34,7 @@ def send_command(vm, command, *args, **kwargs):
if command == 'append' and 'vio-cloud' not in conn.transport.addr: if command == 'append' and 'vio-cloud' not in conn.transport.addr:
continue continue
logger.info('%s(%s, %s)', command, vm, logger.info('%s(%s, %s)', command, vm,
', '.join([str(x)[:100] for x in list(kwargs.values())])) ', '.join(map(lambda x: str(x)[:100], kwargs.values())))
conn.send_command(command=command, args=kwargs) conn.send_command(command=command, args=kwargs)
if uuid: if uuid:
......
...@@ -13,7 +13,7 @@ level = environ.get('LOGLEVEL', 'INFO') ...@@ -13,7 +13,7 @@ level = environ.get('LOGLEVEL', 'INFO')
logger.setLevel(level) logger.setLevel(level)
SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/') SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial')
old_install_platform_tweaks = Worker.install_platform_tweaks old_install_platform_tweaks = Worker.install_platform_tweaks
...@@ -26,6 +26,7 @@ Worker.install_platform_tweaks = install_platform_tweaks ...@@ -26,6 +26,7 @@ Worker.install_platform_tweaks = install_platform_tweaks
def reactor_started(): def reactor_started():
logger.info("reactor_started")
reactor.running_tasks = {} reactor.running_tasks = {}
reactor.ended_tasks = {} reactor.ended_tasks = {}
for f in listdir(SOCKET_DIR): for f in listdir(SOCKET_DIR):
......
...@@ -10,8 +10,8 @@ setuid cloud ...@@ -10,8 +10,8 @@ setuid cloud
script script
cd /home/kohlkriszto/Documents/agentdriver cd /home/cloud/agentdriver
. /home/kohlkriszto/Documents/agentdriver_venv/bin/activate . /home/cloud/.virtualenvs/agentdriver/local/bin/activate
. /home/kohlkriszto/Documents/agentdriver_venv/bin/postactivate . /home/cloud/.virtualenvs/agentdriver/local/bin/postactivate
python agentdriver.py python agentdriver.py
end script end script
#!/usr/bin/env python #!/usr/bin/env python
from twisted.internet import protocol, reactor from twisted.internet import protocol, reactor, inotify
import pickle import pickle
import logging import logging
import time import time
import struct import struct
from os import getenv from os import getenv
import gc
from utils import SerialLineReceiverBase from utils import SerialLineReceiverBase
...@@ -15,15 +16,22 @@ logger = logging.getLogger() ...@@ -15,15 +16,22 @@ logger = logging.getLogger()
reactor.connections = {} reactor.connections = {}
def numObjsByName(name):
num = 0
for ob in gc.get_objects():
if isinstance(ob, name):
num += 1
return num
class GraphiteClientProtocol(protocol.Protocol): class GraphiteClientProtocol(protocol.Protocol):
def connectionMade(self): def connectionMade(self):
logger.info("Monitor connection %s", self.name)
timestamp = time.time() timestamp = time.time()
data_list = [] data_list = []
for key, value in list(self.data.items()): for key, value in self.data.items():
if not isinstance(value, dict): if not isinstance(value, dict):
continue continue
for k, v in list(value.items()): for k, v in value.items():
data_list.append(('agent.%s.%s.%s' % (self.name, key, k), data_list.append(('agent.%s.%s.%s' % (self.name, key, k),
(timestamp, float(v)))) (timestamp, float(v))))
...@@ -41,18 +49,38 @@ class GraphiteClientFactory(protocol.ClientFactory): ...@@ -41,18 +49,38 @@ class GraphiteClientFactory(protocol.ClientFactory):
def inotify_handler(self, file, mask): def inotify_handler(self, file, mask):
file = file.asTextMode(encoding='utf-8')
if file.basename().startswith('cloud'):
return
vm = file.basename().replace('vio-', '') vm = file.basename().replace('vio-', '')
logger.info('inotify: %s (%s)', vm, file.path) logger.info('inotify: %s (%s)', vm, file.path)
for conn in reactor.connections.get(vm, []): if mask:
if file.path == conn.transport.addr: logger.info("event %s (%s) on %s" % (', '.join(inotify.humanReadableMask(mask)), mask, file))
if vm in reactor.running_tasks:
for addr in reactor.running_tasks[vm].get('started', None).copy() :
if file.path == addr:
if mask and mask == inotify.IN_DELETE and reactor.running_tasks[vm]['started'][addr]:
_p = reactor.running_tasks[vm]['started'][addr]
logger.info("DELETE %s", _p)
logger.info('NumOBJSerialLineReceiverFactory1: %s' , numObjsByName(SerialLineReceiverFactory))
del _p
reactor.running_tasks[vm]['started'].pop(addr)
logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
logger.info('reacror_running1 %s', reactor.running_tasks)
return
elif reactor.running_tasks[vm]['started'][addr]:
return return
serial = SerialLineReceiverFactory(vm) serial = SerialLineReceiverFactory(vm)
logger.info("connecting to %s (%s)", vm, file.path) logger.info("connecting to %s (%s)", vm, file.path)
reactor.connectUNIX(file.path, serial) ic = reactor.connectUNIX(file.path, serial, 10)
logger.info('IConnector state: %s', ic.state)
logger.info('reacror_running2 %s', reactor.running_tasks)
# logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
class SerialLineReceiver(SerialLineReceiverBase): class SerialLineReceiver(SerialLineReceiverBase):
def send_to_graphite(self, data): def send_to_graphite(self, data):
logger.info("Send_TO_Graphite")
client = GraphiteClientFactory() client = GraphiteClientFactory()
client.protocol.data = data client.protocol.data = data
client.protocol.name = self.factory.vm client.protocol.name = self.factory.vm
...@@ -61,6 +89,7 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -61,6 +89,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
client) client)
def handle_command(self, command, args): def handle_command(self, command, args):
logger.info('serial_command: %s %s', command, args)
if command == 'agent_stopped': if command == 'agent_stopped':
agent_stopped.apply_async(queue='localhost.man', agent_stopped.apply_async(queue='localhost.man',
args=(self.factory.vm, )) args=(self.factory.vm, ))
...@@ -77,6 +106,7 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -77,6 +106,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
args=args) args=args)
def handle_response(self, response, args): def handle_response(self, response, args):
logger.info('handle_reponse: %s %s', response, args)
vm = self.factory.vm vm = self.factory.vm
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
...@@ -89,17 +119,25 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -89,17 +119,25 @@ class SerialLineReceiver(SerialLineReceiverBase):
reactor.ended_tasks[vm][uuid] = args reactor.ended_tasks[vm][uuid] = args
event.set() event.set()
def connectionMade(self): def connectionMade(self):
logger.info("connected to %s (%s)", self.factory.vm, logger.info("connected to %s (%s)", self.factory.vm, self.transport.addr)
self.transport.addr) logger.info("reactor connections: %s", reactor.connections)
if self.factory.vm not in reactor.connections: if self.factory.vm not in reactor.connections:
reactor.connections[self.factory.vm] = set() reactor.connections[self.factory.vm] = set()
logger.info("reactor connections factory: %s", reactor.connections[self.factory.vm])
reactor.connections[self.factory.vm].add(self) reactor.connections[self.factory.vm].add(self)
logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver))
def connectionLost(self, reason): def connectionLost(self, reason):
logger.info("disconnected from %s (%s)", self.factory.vm, logger.info("disconnected from %s (%s)", self.factory.vm, self.transport.addr)
self.transport.addr)
reactor.connections[self.factory.vm].remove(self) reactor.connections[self.factory.vm].remove(self)
vm = self.factory.vm
# for addr in reactor.running_tasks[vm].get('started', None):
# if addr == self.transport.addr :
# reactor.running_tasks[vm]['started'][addr] = None
logger.info("active connetions: %s", reactor.running_tasks[vm])
class SerialLineReceiverFactory(protocol.ClientFactory): class SerialLineReceiverFactory(protocol.ClientFactory):
...@@ -111,3 +149,36 @@ class SerialLineReceiverFactory(protocol.ClientFactory): ...@@ -111,3 +149,36 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
reactor.running_tasks[vm] = {} reactor.running_tasks[vm] = {}
if vm not in reactor.ended_tasks: if vm not in reactor.ended_tasks:
reactor.ended_tasks[vm] = {} reactor.ended_tasks[vm] = {}
def startedConnecting(self, connector):
vm = self.vm
addr = connector.address
logger.info("startedConnecting to %s (%s)", vm, addr)
logger.info("started connetions: %s", reactor.running_tasks[vm])
if not reactor.running_tasks[vm].get('started', None):
reactor.running_tasks[vm]['started'] = {}
reactor.running_tasks[vm]['started'][addr] = self
logger.info('NumOBJSerialLineReceiverFactory: %s' , numObjsByName(SerialLineReceiverFactory))
logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver))
logger.info("NumConnetions: %s", reactor.running_tasks)
def clientrConnectionLost(self, connector):
vm = self.vm
addr = connector.address
logger.info("clientConnectionLost with %s (%s)", vm, addr)
for _addr in reactor.running_tasks[vm].get('started', None):
if _addr == addr:
reactor.running_tasks[vm]['started'].pop(addr)
logger.info("active connetions: %s", reactor.running_tasks[vm])
def clientrConnectionFailed(self, connector, reason):
vm = self.vm
addr = connector.address
logger.info("clientConnectionFailed with %s (%s)", vm, connector.addr)
for _addr in reactor.running_tasks[vm].get('started', None):
if _addr == addr:
reactor.running_tasks[vm]['started'].pop(addr)
logger.info("active connetions: %s", reactor.running_tasks[vm])
celery celery==3.1.17
Twisted Twisted==20.3.0
threadpool threadpool==1.3.2
...@@ -2,19 +2,24 @@ from twisted.protocols.basic import LineReceiver ...@@ -2,19 +2,24 @@ from twisted.protocols.basic import LineReceiver
import json import json
import logging import logging
logger = logging.getLogger() try:
# Python 2: "unicode" is built-in
unicode
except NameError:
unicode = str
logger = logging.getLogger()
class SerialLineReceiverBase(LineReceiver, object): class SerialLineReceiverBase(LineReceiver, object):
delimiter = '\r' delimiter = b'\r'
def send_response(self, response, args): def send_response(self, response, args):
self.transport.write(json.dumps({'response': response, self.transport.write(str.encode(json.dumps({'response': response,
'args': args}) + '\r\n') 'args': args}) + '\r\n'))
def send_command(self, command, args): def send_command(self, command, args):
self.transport.write(json.dumps({'command': command, self.transport.write(str.encode(json.dumps({'command': command,
'args': args}) + '\r\n') 'args': args}) + '\r\n'))
def handle_command(self, command, args): def handle_command(self, command, args):
raise NotImplementedError("Subclass must implement abstract method") raise NotImplementedError("Subclass must implement abstract method")
...@@ -37,9 +42,9 @@ class SerialLineReceiverBase(LineReceiver, object): ...@@ -37,9 +42,9 @@ class SerialLineReceiverBase(LineReceiver, object):
logger.error('[serial] invalid json: %s (%s)' % (data, e)) logger.error('[serial] invalid json: %s (%s)' % (data, e))
return return
if command is not None and isinstance(command, str): if command is not None and isinstance(command, unicode):
logger.debug('received command: %s (%s)' % (command, args)) logger.debug('received command: %s (%s)' % (command, args))
self.handle_command(command, args) self.handle_command(command, args)
elif response is not None and isinstance(response, str): elif response is not None and isinstance(response, unicode):
logger.debug('received reply: %s (%s)' % (response, args)) logger.debug('received reply: %s (%s)' % (response, args))
self.handle_response(response, args) self.handle_response(response, args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment