Commit da8159bb by Szeberényi Imre

Merge branch 'python3.6' into 'master'

Python3.6

See merge request !1
parents f859b4cc 0a1798be
......@@ -16,6 +16,7 @@ AMQP_URI = getenv('AMQP_URI')
celery = Celery('agent', broker=AMQP_URI)
celery.conf.update(CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERYD_PREFETCH_MULTIPLIER=32,
CELERY_QUEUES=(Queue(HOSTNAME + '.agent',
Exchange('agent', type='direct'),
routing_key='agent'), ))
......@@ -135,19 +136,19 @@ def change_ip(vm, interfaces, dns):
@celery.task(name='vm.tasks.local_agent_tasks.renew')
def renew(vm):
print vm
print(vm)
@celery.task(name='vm.tasks.local_agent_tasks.agent_started')
def agent_started(vm):
print vm
print(vm)
@celery.task(name='vm.tasks.local_agent_tasks.agent_stopped')
def agent_stopped(vm):
print vm
print(vm)
@celery.task(name='vm.tasks.local_agent_tasks.agent_ok')
def agent_ok(vm):
print vm
print(vm)
......@@ -26,6 +26,7 @@ Worker.install_platform_tweaks = install_platform_tweaks
def reactor_started():
logger.info("reactor_started")
reactor.running_tasks = {}
reactor.ended_tasks = {}
for f in listdir(SOCKET_DIR):
......
#!/usr/bin/env python
from twisted.internet import protocol, reactor
from twisted.internet import protocol, reactor, inotify
import pickle
import logging
import time
import struct
from os import getenv
import gc
from utils import SerialLineReceiverBase
......@@ -15,9 +16,16 @@ logger = logging.getLogger()
reactor.connections = {}
def numObjsByName(name):
num = 0
for ob in gc.get_objects():
if isinstance(ob, name):
num += 1
return num
class GraphiteClientProtocol(protocol.Protocol):
def connectionMade(self):
logger.info("Monitor connection %s", self.name)
timestamp = time.time()
data_list = []
for key, value in self.data.items():
......@@ -41,18 +49,38 @@ class GraphiteClientFactory(protocol.ClientFactory):
def inotify_handler(self, file, mask):
file = file.asTextMode(encoding='utf-8')
if file.basename().startswith('cloud'):
return
vm = file.basename().replace('vio-', '')
logger.info('inotify: %s (%s)', vm, file.path)
for conn in reactor.connections.get(vm, []):
if file.path == conn.transport.addr:
return
if mask:
logger.info("event %s (%s) on %s" % (', '.join(inotify.humanReadableMask(mask)), mask, file))
if vm in reactor.running_tasks:
for addr in reactor.running_tasks[vm].get('started', None).copy() :
if file.path == addr:
if mask and mask == inotify.IN_DELETE and reactor.running_tasks[vm]['started'][addr]:
_p = reactor.running_tasks[vm]['started'][addr]
logger.info("DELETE %s", _p)
logger.info('NumOBJSerialLineReceiverFactory1: %s' , numObjsByName(SerialLineReceiverFactory))
del _p
reactor.running_tasks[vm]['started'].pop(addr)
logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
logger.info('reacror_running1 %s', reactor.running_tasks)
return
elif reactor.running_tasks[vm]['started'][addr]:
return
serial = SerialLineReceiverFactory(vm)
logger.info("connecting to %s (%s)", vm, file.path)
reactor.connectUNIX(file.path, serial)
ic = reactor.connectUNIX(file.path, serial, 10)
logger.info('IConnector state: %s', ic.state)
logger.info('reacror_running2 %s', reactor.running_tasks)
# logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
class SerialLineReceiver(SerialLineReceiverBase):
def send_to_graphite(self, data):
logger.info("Send_TO_Graphite")
client = GraphiteClientFactory()
client.protocol.data = data
client.protocol.name = self.factory.vm
......@@ -61,6 +89,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
client)
def handle_command(self, command, args):
logger.info('serial_command: %s %s', command, args)
if command == 'agent_stopped':
agent_stopped.apply_async(queue='localhost.man',
args=(self.factory.vm, ))
......@@ -77,6 +106,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
args=args)
def handle_response(self, response, args):
logger.info('handle_reponse: %s %s', response, args)
vm = self.factory.vm
if response == 'status':
self.send_to_graphite(args)
......@@ -89,17 +119,25 @@ class SerialLineReceiver(SerialLineReceiverBase):
reactor.ended_tasks[vm][uuid] = args
event.set()
def connectionMade(self):
logger.info("connected to %s (%s)", self.factory.vm,
self.transport.addr)
logger.info("connected to %s (%s)", self.factory.vm, self.transport.addr)
logger.info("reactor connections: %s", reactor.connections)
if self.factory.vm not in reactor.connections:
reactor.connections[self.factory.vm] = set()
logger.info("reactor connections factory: %s", reactor.connections[self.factory.vm])
reactor.connections[self.factory.vm].add(self)
logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver))
def connectionLost(self, reason):
logger.info("disconnected from %s (%s)", self.factory.vm,
self.transport.addr)
logger.info("disconnected from %s (%s)", self.factory.vm, self.transport.addr)
reactor.connections[self.factory.vm].remove(self)
vm = self.factory.vm
# for addr in reactor.running_tasks[vm].get('started', None):
# if addr == self.transport.addr :
# reactor.running_tasks[vm]['started'][addr] = None
logger.info("active connetions: %s", reactor.running_tasks[vm])
class SerialLineReceiverFactory(protocol.ClientFactory):
......@@ -111,3 +149,36 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
reactor.running_tasks[vm] = {}
if vm not in reactor.ended_tasks:
reactor.ended_tasks[vm] = {}
def startedConnecting(self, connector):
vm = self.vm
addr = connector.address
logger.info("startedConnecting to %s (%s)", vm, addr)
logger.info("started connetions: %s", reactor.running_tasks[vm])
if not reactor.running_tasks[vm].get('started', None):
reactor.running_tasks[vm]['started'] = {}
reactor.running_tasks[vm]['started'][addr] = self
logger.info('NumOBJSerialLineReceiverFactory: %s' , numObjsByName(SerialLineReceiverFactory))
logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver))
logger.info("NumConnetions: %s", reactor.running_tasks)
def clientrConnectionLost(self, connector):
vm = self.vm
addr = connector.address
logger.info("clientConnectionLost with %s (%s)", vm, addr)
for _addr in reactor.running_tasks[vm].get('started', None):
if _addr == addr:
reactor.running_tasks[vm]['started'].pop(addr)
logger.info("active connetions: %s", reactor.running_tasks[vm])
def clientrConnectionFailed(self, connector, reason):
vm = self.vm
addr = connector.address
logger.info("clientConnectionFailed with %s (%s)", vm, connector.addr)
for _addr in reactor.running_tasks[vm].get('started', None):
if _addr == addr:
reactor.running_tasks[vm]['started'].pop(addr)
logger.info("active connetions: %s", reactor.running_tasks[vm])
celery==3.1.17
Twisted==20.3.0
threadpool==1.3.2
# Local development dependencies go here
-r base.txt
# Pro-tip: Try not to put anything here. There should be no dependency in
# production that isn't in development.
-r base.txt
# Test dependencies go here.
-r base.txt
coverage==3.7.1
factory-boy==2.4.1
mock==1.0.1
django-nose==1.4.4
nose==1.3.7
nose-exclude==0.5.0
selenium==2.45.0
#selenose==1.3
-e git+https://github.com/kmmbvnr/django-jenkins.git@019774dc2f668bc66b66f90f97eb8e14ae9566a4#egg=django_jenkins-dev
......@@ -2,19 +2,24 @@ from twisted.protocols.basic import LineReceiver
import json
import logging
logger = logging.getLogger()
try:
# Python 2: "unicode" is built-in
unicode
except NameError:
unicode = str
logger = logging.getLogger()
class SerialLineReceiverBase(LineReceiver, object):
delimiter = '\r'
delimiter = b'\r'
def send_response(self, response, args):
self.transport.write(json.dumps({'response': response,
'args': args}) + '\r\n')
self.transport.write(str.encode(json.dumps({'response': response,
'args': args}) + '\r\n'))
def send_command(self, command, args):
self.transport.write(json.dumps({'command': command,
'args': args}) + '\r\n')
self.transport.write(str.encode(json.dumps({'command': command,
'args': args}) + '\r\n'))
def handle_command(self, command, args):
raise NotImplementedError("Subclass must implement abstract method")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment