Commit edddef0f by Szeberényi Imre

celery fix

parent 8a0740e8
...@@ -3,35 +3,65 @@ import serializers ...@@ -3,35 +3,65 @@ import serializers
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
QUEUE_NAME = "localhots.man" QUEUE_NAME = "localhost.man"
AMQP_URI = getenv('AMQP_URI') AMQP_URI = getenv('AMQP_URI')
celery = Celery('agentman', broker=AMQP_URI) celerym = Celery('agentman', broker=AMQP_URI)
celery.config_from_object('celeryconfig') celerym.config_from_object('celeryconfig')
celery.conf.update( celerym.conf.update(
CELERY_QUEUES=(Queue(QUEUE_NAME, Exchange('manager', type='direct'), # Queue declaration
routing_key='manager'), ), CELERY_QUEUES=(
Queue(
QUEUE_NAME,
Exchange('manager', type='direct'),
routing_key='manager',
),
),
task_protocol = 1, # Celery 3 compatibility task_protocol = 1, # Celery 3 compatibility
)
@celery.task(name='vm.tasks.local_agent_tasks.renew') # ROUTING: which task which queue / exchange / routing_key
CELERY_ROUTES={
'vm.tasks.local_agent_tasks.renew': {
'queue': QUEUE_NAME,
'exchange': 'manager',
'routing_key': 'manager',
},
'vm.tasks.local_agent_tasks.agent_started': {
'queue': QUEUE_NAME,
'exchange': 'manager',
'routing_key': 'manager',
},
'vm.tasks.local_agent_tasks.agent_stopped': {
'queue': QUEUE_NAME,
'exchange': 'manager',
'routing_key': 'manager',
},
'vm.tasks.local_agent_tasks.agent_ok': {
'queue': QUEUE_NAME,
'exchange': 'manager',
'routing_key': 'manager',
},
},
)
@celerym.task(name='vm.tasks.local_agent_tasks.renew')
def renew(vm): def renew(vm):
pass pass
@celery.task(name='vm.tasks.local_agent_tasks.agent_started') @celerym.task(name='vm.tasks.local_agent_tasks.agent_started')
def agent_started(vm, version, system): def agent_started(vm, version, system):
pass pass
@celery.task(name='vm.tasks.local_agent_tasks.agent_stopped') @celerym.task(name='vm.tasks.local_agent_tasks.agent_stopped')
def agent_stopped(vm): def agent_stopped(vm):
pass pass
@celery.task(name='vm.tasks.local_agent_tasks.agent_ok') @celerym.task(name='vm.tasks.local_agent_tasks.agent_ok')
def agent_ok(vm): def agent_ok(vm):
pass pass
...@@ -64,7 +64,7 @@ def inotify_handler(self, file, mask): ...@@ -64,7 +64,7 @@ def inotify_handler(self, file, mask):
del _p del _p
reactor.running_tasks[vm]['started'].pop(addr) reactor.running_tasks[vm]['started'].pop(addr)
logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory)) logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
logger.info('reacror_running1 %s', reactor.running_tasks) logger.debug('reacror_running1 %s', reactor.running_tasks)
return return
elif reactor.running_tasks[vm]['started'][addr]: elif reactor.running_tasks[vm]['started'][addr]:
return return
...@@ -72,8 +72,8 @@ def inotify_handler(self, file, mask): ...@@ -72,8 +72,8 @@ def inotify_handler(self, file, mask):
logger.info("connecting to %s (%s)", vm, file.path) logger.info("connecting to %s (%s)", vm, file.path)
ic = reactor.connectUNIX(file.path, serial, 10) ic = reactor.connectUNIX(file.path, serial, 10)
logger.info('IConnector state: %s', ic.state) logger.info('IConnector state: %s', ic.state)
logger.info('reacror_running2 %s', reactor.running_tasks) logger.debug('reacror_running2 %s', reactor.running_tasks)
# logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory)) logger.debug('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory))
class SerialLineReceiver(SerialLineReceiverBase): class SerialLineReceiver(SerialLineReceiverBase):
...@@ -88,31 +88,27 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -88,31 +88,27 @@ class SerialLineReceiver(SerialLineReceiverBase):
def handle_command(self, command, args): def handle_command(self, command, args):
logger.info('serial_command: %s %s', command, args) logger.info('serial_command: %s %s', command, args)
from kombu import Exchange, Queue
if command == 'agent_stopped': if command == 'agent_stopped':
agent_stopped.apply_async(queue='localhost.man', logger.debug('agent_stopped %r', self.factory.vm)
args=(self.factory.vm, )) agent_stopped.apply_async(args=(self.factory.vm, ))
elif command == 'agent_started': elif command == 'agent_started':
version = args.get('version', None) version = args.get('version', None)
system = args.get('system', None) system = args.get('system', None)
from kombu import Exchange, Queue
st = agent_started.apply_async( st = agent_started.apply_async(
args=(self.factory.vm, version, system), args=(self.factory.vm, version, system),
exchange='manager',
routing_key='manager',
serializer='json',
declare=[Queue('localhost.man', Exchange('manager','direct'),
routing_key='manager')],
) )
# logger.debug("apply_async %r", st.id)
# logger.debug("apply_st6 %r", st.status)
elif command == 'renew': elif command == 'renew':
renew.apply_async(queue='localhost.man', renew.apply_async(args=(self.factory.vm, ))
args=(self.factory.vm, ))
elif command == 'ping': elif command == 'ping':
self.send_response(response='pong', self.send_response(response='pong', args=args)
args=args)
def handle_response(self, response, args): def handle_response(self, response, args):
logger.debug('handle_response: %s %s', response, args) logger.info('handle_response: %s %s', response, args)
vm = self.factory.vm vm = self.factory.vm
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
...@@ -128,7 +124,7 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -128,7 +124,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
def connectionMade(self): def connectionMade(self):
logger.info("connected to %s (%s)", self.factory.vm, self.transport.addr) logger.info("connected to %s (%s)", self.factory.vm, self.transport.addr)
logger.info("reactor connections: %s", reactor.connections) logger.debug("reactor connections: %s", reactor.connections)
if self.factory.vm not in reactor.connections: if self.factory.vm not in reactor.connections:
reactor.connections[self.factory.vm] = set() reactor.connections[self.factory.vm] = set()
logger.info("reactor connections factory: %s", reactor.connections[self.factory.vm]) logger.info("reactor connections factory: %s", reactor.connections[self.factory.vm])
...@@ -166,7 +162,7 @@ class SerialLineReceiverFactory(protocol.ClientFactory): ...@@ -166,7 +162,7 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
reactor.running_tasks[vm]['started'][addr] = self reactor.running_tasks[vm]['started'][addr] = self
logger.info('NumOBJSerialLineReceiverFactory: %s' , numObjsByName(SerialLineReceiverFactory)) logger.info('NumOBJSerialLineReceiverFactory: %s' , numObjsByName(SerialLineReceiverFactory))
logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver)) logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver))
logger.info("NumConnetions: %s", reactor.running_tasks) logger.debug("NumConnetions: %s", reactor.running_tasks)
def clientrConnectionLost(self, connector): def clientrConnectionLost(self, connector):
vm = self.vm vm = self.vm
...@@ -186,3 +182,5 @@ class SerialLineReceiverFactory(protocol.ClientFactory): ...@@ -186,3 +182,5 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
reactor.running_tasks[vm]['started'].pop(addr) reactor.running_tasks[vm]['started'].pop(addr)
logger.info("active connetions: %s", reactor.running_tasks[vm]) logger.info("active connetions: %s", reactor.running_tasks[vm])
# serializers.py # serializers.py
from kombu.serialization import register from kombu.serialization import register, enable_insecure_serializers, pickle_protocol
import pickle import pickle
import logging
# PROTOKOLL 2 → Py2.7 is képes lesz deszerializálni
pickle_protocol = 2
logger = logging.getLogger(__name__)
def pickle_v2_dumps(obj): def pickle_v2_dumps(obj):
logger.debug("pickle_v2_dumps CALLED type=%r repr=%r", type(obj), repr(obj))
return pickle.dumps(obj, protocol=2) return pickle.dumps(obj, protocol=2)
def pickle_v2_loads(s): def pickle_v2_loads(s):
logger.debug("pickle_v2_loads CALLED len=%r", len(s))
return pickle.loads(s) return pickle.loads(s)
register( register(
...@@ -16,3 +24,5 @@ register( ...@@ -16,3 +24,5 @@ register(
content_encoding='binary', content_encoding='binary',
) )
#enable_insecure_serializers()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment