Commit eae713a8 by user

first commit

parents
*.pyc
*.swp
# from twisted.internet.defer import Deferred
from twisted.internet import reactor # threads
from celery import Celery
from kombu import Queue, Exchange
from os import getenv
from socket import gethostname
import logging
logger = logging.getLogger(__name__)
HOSTNAME = gethostname()
AMQP_URI = getenv('AMQP_URI', 'amqp://cloud:password@10.7.0.96:5672/circle')
celery = Celery('agent', broker=AMQP_URI, backend='amqp')
celery.conf.update(CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=(Queue(HOSTNAME + '.agent',
Exchange('agent', type='direct'),
routing_key='agent'), ))
@celery.task(name='agent.change_password')
def change_password(vm, password):
reactor.connections[vm].send_command(command='change_password',
args={'password':
password})
logger.debug('change_password(%s,%s)' % (vm, password))
@celery.task(name='agent.set_hostname')
def set_hostname(vm, hostname):
reactor.connections[vm].send_command(command='set_hostname',
args={'hostname':
hostname})
logger.debug('set_hostname(%s,%s)' % (vm, hostname))
@celery.task(name='agent.restart_networking')
def restart_networking(vm):
reactor.connections[vm].send_command(command='restart_networking',
args={})
logger.debug('restart_networking(%s)' % (vm))
@celery.task(name='agent.set_time')
def set_time(vm, time):
reactor.connections[vm].send_command(command='set_time',
args={'time': time})
logger.debug('set_time(%s,%s)' % (vm, time))
@celery.task(name='vm.tasks.local_agent_tasks.agent_started')
def agent_started(vm):
print vm
@celery.task(name='vm.tasks.local_agent_tasks.agent_stopped')
def agent_stopped(vm):
print vm
@celery.task(name='vm.tasks.local_agent_tasks.agent_ok')
def agent_ok(vm):
print vm
# class StartProcTask(celery.Task):
# def run(self):
# print 'HELLO'*10
# self.app.proc = WCProcessProtocol('testing')
# self.app.proc._waiting['startup'] = Deferred()
# def lofasz(asd):
# print 'ezjott%s' % asd
# self.app.proc._waiting['startup'].addCallback(lofasz)
# threads.blockingCallFromThread(reactor, reactor.spawnProcess,
# self.app.proc, 'ls', ['ls'])
# return True
from celery.apps.worker import Worker
from twisted.internet import reactor, inotify
from twisted.python import filepath
from agentcelery import celery
from protocol import inotify_handler
from os import getenv
import logging
SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial')
def main():
w = Worker(app=celery, concurrency=1,
pool_cls='threads',
loglevel=logging.DEBUG)
reactor.callInThread(w.run)
notifier = inotify.INotify(reactor)
notifier.startReading()
notifier.watch(filepath.FilePath(SOCKET_DIR),
callbacks=[inotify_handler])
reactor.run()
if __name__ == '__main__':
main()
#!/usr/bin/env python
from twisted.internet import protocol, reactor
import pickle
import logging
import time
import struct
from utils import SerialLineReceiverBase
from agentcelery import agent_started, agent_stopped, agent_ok
logger = logging.getLogger(__name__)
reactor.connections = {}
class GraphiteClientProtocol(protocol.Protocol):
def connectionMade(self):
timestamp = time.time()
data_list = []
for key, value in self.data.items():
if not isinstance(value, dict):
continue
for k, v in value.items():
data_list.append(('agent.%s.%s.%s' % (self.name, key, k),
(timestamp, float(v))))
payload = pickle.dumps(data_list)
header = struct.pack("!L", len(payload))
message = header + payload
self.transport.write(message)
self.transport.loseConnection()
logger.debug('s: %s' % self.data)
class GraphiteClientFactory(protocol.ClientFactory):
protocol = GraphiteClientProtocol
def inotify_handler(self, file, mask):
vm = file.basename()
logger.info('inotify: %s' % vm)
if vm in reactor.connections:
return
serial = SerialLineReceiverFactory(vm)
reactor.connectUNIX(file.path, serial)
class SerialLineReceiver(SerialLineReceiverBase):
def send_to_graphite(self, data):
client = GraphiteClientFactory()
client.protocol.data = data
client.protocol.name = self.factory.vm
reactor.connectTCP('10.7.0.96', 2004, client)
def handle_command(self, command, args):
if command == 'agent_stopped':
agent_stopped.apply_async(queue='localhost.man',
args=(self.factory.vm, ))
if command == 'agent_started':
agent_started.apply_async(queue='localhost.man',
args=(self.factory.vm, ))
if command == 'ping':
self.send_response(response='pong',
args=args)
def handle_response(self, response, args):
if response == 'status':
self.send_to_graphite(args)
def connectionMade(self):
logger.info("connected to %s" % self.factory.vm)
reactor.connections[self.factory.vm] = self
def connectionLost(self, reason):
logger.info("disconnected from %s" % self.factory.vm)
del reactor.connections[self.factory.vm]
class SerialLineReceiverFactory(protocol.ClientFactory):
protocol = SerialLineReceiver
def __init__(self, vm):
self.vm = vm
from twisted.protocols.basic import LineReceiver
import json
import logging
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
class SerialLineReceiverBase(LineReceiver, object):
delimiter = '\r'
def send_response(self, response, args):
self.transport.write(json.dumps({'response': response,
'args': args}) + '\r\n')
def send_command(self, command, args):
self.transport.write(json.dumps({'command': command,
'args': args}) + '\r\n')
def handle_command(self, command, args):
raise NotImplementedError("Subclass must implement abstract method")
def handle_response(self, response, args):
raise NotImplementedError("Subclass must implement abstract method")
def lineReceived(self, data):
try:
data = json.loads(data)
args = data.get('args', {})
if not isinstance(args, dict):
args = {}
command = data.get('command', None)
response = data.get('response', None)
logging.debug('[serial] valid json: %s' % (data, ))
except (ValueError, KeyError) as e:
logging.error('[serial] invalid json: %s (%s)' % (data, e))
return
if command is not None and isinstance(command, unicode):
logging.debug('received command: %s (%s)' % (command, args))
self.handle_command(command, args)
elif response is not None and isinstance(response, unicode):
logging.debug('received reply: %s (%s)' % (response, args))
self.handle_response(response, args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment