Commit 8a0740e8 by Szeberényi Imre

Fured fix

parent b40e6ffa
...@@ -5,44 +5,32 @@ from agentcelery import celery, HOSTNAME ...@@ -5,44 +5,32 @@ from agentcelery import celery, HOSTNAME
from protocol import inotify_handler from protocol import inotify_handler
from os import getenv, listdir, path, environ, kill, getpid from os import getenv, listdir, path, environ, kill, getpid
import signal import signal
import logging
from celery.utils.log import get_task_logger
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger from celery.signals import after_setup_task_logger
from celery.app.log import TaskFormatter from celery.app.log import TaskFormatter
import sys
from multiprocessing import current_process
process = current_process()
process.name = sys.argv[0]
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
level = environ.get('LOGLEVEL', 'INFO') level = environ.get('LOGLEVEL', 'INFO')
logger.setLevel(level) logger.setLevel(level)
#logger = logging.getLogger(__name__)
@after_setup_task_logger.connect @after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs): def setup_task_logger(logger, *args, **kwargs):
for handler in logger.handlers: for handler in logger.handlers:
handler.setFormatter(TaskFormatter('%(asctime)s - %(levelname)s/agentdriver.%(processName)s - %(module)s - %(message)s')) handler.setFormatter(TaskFormatter('[%(levelname)s/%(name)s] %(message)s'))
SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial') SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial')
old_install_platform_tweaks = Worker.install_platform_tweaks old_install_platform_tweaks = Worker.install_platform_tweaks
def install_platform_tweaks(self, worker): def install_platform_tweaks(self, worker):
self.worker = worker self.worker = worker
old_install_platform_tweaks(self, worker) old_install_platform_tweaks(self, worker)
Worker.install_platform_tweaks = install_platform_tweaks Worker.install_platform_tweaks = install_platform_tweaks
def reactor_started(): def reactor_started():
# print(vars(logger.handlers[0].formatter))
logger.info("reactor_started") logger.info("reactor_started")
reactor.running_tasks = {} reactor.running_tasks = {}
reactor.ended_tasks = {} reactor.ended_tasks = {}
......
## from twisted.internet.defer import Deferred
#from twisted.internet import reactor # threads
#from celery.result import TimeoutError
from celery import Celery from celery import Celery
import serializers import serializers
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
#from socket import gethostname
#from threading import Event
import logging
from celery.utils.log import get_task_logger
#from celery.signals import after_setup_task_logger
#from celery.app.log import TaskFormatter
logger = get_task_logger(__name__)
QUEUE_NAME = "localhots.man" QUEUE_NAME = "localhots.man"
AMQP_URI = getenv('AMQP_URI') AMQP_URI = getenv('AMQP_URI')
...@@ -25,11 +14,7 @@ celery.conf.update( ...@@ -25,11 +14,7 @@ celery.conf.update(
CELERY_QUEUES=(Queue(QUEUE_NAME, Exchange('manager', type='direct'), CELERY_QUEUES=(Queue(QUEUE_NAME, Exchange('manager', type='direct'),
routing_key='manager'), ), routing_key='manager'), ),
task_protocol = 1, # Celery 3 compatibility task_protocol = 1, # Celery 3 compatibility
task_serializer='json', )
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='UTC',
enable_utc=True,)
@celery.task(name='vm.tasks.local_agent_tasks.renew') @celery.task(name='vm.tasks.local_agent_tasks.renew')
......
result_backend = 'amqp://' result_backend = 'amqp://'
task_result_expires = 300 task_result_expires = 300
result_expires = 300 result_expires = 300
timezone = 'utc' timezone = 'UTC'
enable_utc = True enable_utc = True
accept_content = ['json', 'pickle_v2'] accept_content = ['json', 'pickle_v2', 'pickle']
task_serializer = 'json' task_serializer = 'json'
result_serializer = 'pickle_v2' result_serializer = 'pickle_v2'
task_store_errors_even_if_ignored = True task_store_errors_even_if_ignored = True
......
/var/lib/libvirt/serial IN_CREATE setfacl -m u:cloud:rw $@/$#
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
from twisted.internet import protocol, reactor, inotify from twisted.internet import protocol, reactor, inotify
import pickle import pickle
import logging
import time import time
import struct import struct
from os import getenv from os import getenv
...@@ -105,20 +104,6 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -105,20 +104,6 @@ class SerialLineReceiver(SerialLineReceiverBase):
routing_key='manager')], routing_key='manager')],
) )
# st = agent_started.apply_async(
# args=(self.factory.vm, version, system),
# queue='localhost.man',
# exchange='manager',
# routing_key='manager',
# serializer='json',
# )
logger.debug("apply_async %r", st.id)
logger.debug("apply_st %r", st.status)
time.sleep(2)
logger.debug("apply_st2 %r", st.status)
time.sleep(8)
logger.debug("apply_st3 %r", st.status)
elif command == 'renew': elif command == 'renew':
renew.apply_async(queue='localhost.man', renew.apply_async(queue='localhost.man',
args=(self.factory.vm, )) args=(self.factory.vm, ))
...@@ -127,7 +112,7 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -127,7 +112,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
args=args) args=args)
def handle_response(self, response, args): def handle_response(self, response, args):
logger.info('handle_response: %s %s', response, args) logger.debug('handle_response: %s %s', response, args)
vm = self.factory.vm vm = self.factory.vm
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
...@@ -201,5 +186,3 @@ class SerialLineReceiverFactory(protocol.ClientFactory): ...@@ -201,5 +186,3 @@ class SerialLineReceiverFactory(protocol.ClientFactory):
reactor.running_tasks[vm]['started'].pop(addr) reactor.running_tasks[vm]['started'].pop(addr)
logger.info("active connetions: %s", reactor.running_tasks[vm]) logger.info("active connetions: %s", reactor.running_tasks[vm])
from twisted.protocols.basic import LineReceiver from twisted.protocols.basic import LineReceiver
import json import json
import logging from celery.utils.log import get_task_logger
try: try:
# Python 2: "unicode" is built-in # Python 2: "unicode" is built-in
...@@ -8,7 +8,7 @@ try: ...@@ -8,7 +8,7 @@ try:
except NameError: except NameError:
unicode = str unicode = str
logger = logging.getLogger(__name__) logger = get_task_logger(__name__)
class SerialLineReceiverBase(LineReceiver, object): class SerialLineReceiverBase(LineReceiver, object):
delimiter = b'\r' delimiter = b'\r'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment