Commit f859b4cc by Szeberényi Imre

Initial version, Python 2.7

parents
*.pyc
*.swp
*.swo
.ropeproject
# from twisted.internet.defer import Deferred
from twisted.internet import reactor # threads
from celery.result import TimeoutError
from celery import Celery
from kombu import Queue, Exchange
from os import getenv
from socket import gethostname
from threading import Event
import logging
logger = logging.getLogger()
HOSTNAME = gethostname().split('.')[0]
AMQP_URI = getenv('AMQP_URI')
celery = Celery('agent', broker=AMQP_URI)
celery.conf.update(CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=(Queue(HOSTNAME + '.agent',
Exchange('agent', type='direct'),
routing_key='agent'), ))
def send_command(vm, command, *args, **kwargs):
uuid = kwargs.get('uuid', None)
timeout = kwargs.get('timeout', 58)
if uuid:
event = Event()
reactor.running_tasks[vm][uuid] = event
reactor.ended_tasks[vm][uuid] = None
for conn in reactor.connections[vm]:
if command == 'append' and 'vio-cloud' not in conn.transport.addr:
continue
logger.info('%s(%s, %s)', command, vm,
', '.join(map(lambda x: str(x)[:100], kwargs.values())))
conn.send_command(command=command, args=kwargs)
if uuid:
success = event.wait(timeout)
retval = reactor.ended_tasks[vm][uuid]
del reactor.ended_tasks[vm][uuid]
del reactor.running_tasks[vm][uuid]
if not success:
raise TimeoutError()
return retval
@celery.task(name='agent.change_password')
def change_password(vm, password):
send_command(vm, command='change_password', password=password)
@celery.task(name='agent.set_hostname')
def set_hostname(vm, hostname):
send_command(vm, command='set_hostname', hostname=hostname)
@celery.task(name='agent.restart_networking')
def restart_networking(vm):
send_command(vm, command='restart_networking')
@celery.task(name='agent.set_time')
def set_time(vm, time):
send_command(vm, command='set_time', time=time)
@celery.task(name='agent.mount_store')
def mount_store(vm, host, username, password):
send_command(vm, command='mount_store', host=host,
username=username, password=password)
@celery.task(name='agent.cleanup')
def cleanup(vm):
send_command(vm, command='cleanup')
@celery.task(name='agent.start_access_server')
def start_access_server(vm):
send_command(vm, command='start_access_server')
@celery.task(name='agent.append')
def append(vm, data, filename, chunk_number):
kwargs = {'command': 'append', 'data': data, 'chunk_number': chunk_number,
'filename': filename, 'uuid': append.request.id}
return send_command(vm, **kwargs)
@celery.task(name='agent.update_legacy')
def update_legacy(vm, data):
kwargs = {'command': 'update', 'uuid': update_legacy.request.id,
'data': data}
return send_command(vm, **kwargs)
@celery.task(name='agent.update')
def update(vm, filename, executable, checksum):
kwargs = {'command': 'update', 'uuid': update.request.id,
'filename': filename, 'checksum': checksum,
'executable': executable}
return send_command(vm, **kwargs)
@celery.task(name='agent.add_keys')
def add_keys(vm, keys):
send_command(vm, command='add_keys', keys=keys)
@celery.task(name='agent.del_keys')
def del_keys(vm, keys):
send_command(vm, command='del_keys', keys=keys)
@celery.task(name='agent.get_keys')
def get_keys(vm):
return send_command(vm, command='get_keys')
@celery.task(name='agent.send_expiration')
def send_expiration(vm, url):
return send_command(vm, command='send_expiration',
url=url)
@celery.task(name='agent.change_ip')
def change_ip(vm, interfaces, dns):
send_command(vm, command='change_ip', interfaces=interfaces, dns=dns)
@celery.task(name='vm.tasks.local_agent_tasks.renew')
def renew(vm):
print vm
@celery.task(name='vm.tasks.local_agent_tasks.agent_started')
def agent_started(vm):
print vm
@celery.task(name='vm.tasks.local_agent_tasks.agent_stopped')
def agent_stopped(vm):
print vm
@celery.task(name='vm.tasks.local_agent_tasks.agent_ok')
def agent_ok(vm):
print vm
from celery.apps.worker import Worker
from twisted.internet import reactor, inotify
from twisted.python import filepath
from agentcelery import celery, HOSTNAME
from protocol import inotify_handler
from os import getenv, listdir, path, environ, kill, getpid
import signal
import logging
logging.basicConfig()
logger = logging.getLogger()
level = environ.get('LOGLEVEL', 'INFO')
logger.setLevel(level)
SOCKET_DIR = getenv('SOCKET_DIR', '/var/lib/libvirt/serial')
old_install_platform_tweaks = Worker.install_platform_tweaks
def install_platform_tweaks(self, worker):
self.worker = worker
old_install_platform_tweaks(self, worker)
Worker.install_platform_tweaks = install_platform_tweaks
def reactor_started():
reactor.running_tasks = {}
reactor.ended_tasks = {}
for f in listdir(SOCKET_DIR):
f = path.join(SOCKET_DIR, f)
inotify_handler(None, filepath.FilePath(f), None)
def reactor_stopped(worker):
logger.info("Reactor stopped.")
kill(getpid(), signal.SIGKILL)
def main():
w = Worker(app=celery, concurrency=1,
pool_cls='threads',
hostname=HOSTNAME + '.agentdriver',
without_mingle=True, without_gossip=True,
loglevel=level)
reactor.callInThread(w.start)
notifier = inotify.INotify(reactor)
notifier.startReading()
notifier.watch(filepath.FilePath(SOCKET_DIR),
callbacks=[inotify_handler])
reactor.callWhenRunning(reactor_started)
reactor.addSystemEventTrigger("before", "shutdown", reactor_stopped, w)
reactor.run()
if __name__ == '__main__':
main()
description "CIRCLE agentdriver"
start on runlevel [2345]
stop on runlevel [!2345]
respawn
respawn limit 30 30
setgid cloud
setuid cloud
script
cd /home/cloud/agentdriver
. /home/cloud/.virtualenvs/agentdriver/local/bin/activate
. /home/cloud/.virtualenvs/agentdriver/local/bin/postactivate
python agentdriver.py
end script
[Unit]
Description=CIRCLE agentdriver
BindsTo=node.service
[Service]
User=cloud
Group=cloud
WorkingDirectory=/home/cloud/agentdriver
ExecStart=/bin/bash -c "source /etc/profile; workon agentdriver; exec python agentdriver.py"
Restart=always
[Install]
WantedBy=multi-user.target
#!/usr/bin/env python
from twisted.internet import protocol, reactor
import pickle
import logging
import time
import struct
from os import getenv
from utils import SerialLineReceiverBase
from agentcelery import agent_started, agent_stopped, renew
logger = logging.getLogger()
reactor.connections = {}
class GraphiteClientProtocol(protocol.Protocol):
def connectionMade(self):
timestamp = time.time()
data_list = []
for key, value in self.data.items():
if not isinstance(value, dict):
continue
for k, v in value.items():
data_list.append(('agent.%s.%s.%s' % (self.name, key, k),
(timestamp, float(v))))
payload = pickle.dumps(data_list)
header = struct.pack("!L", len(payload))
message = header + payload
self.transport.write(message)
self.transport.loseConnection()
logger.debug('s: %s' % self.data)
logger.info("Monitor info from: %s", self.name)
class GraphiteClientFactory(protocol.ClientFactory):
protocol = GraphiteClientProtocol
def inotify_handler(self, file, mask):
vm = file.basename().replace('vio-', '')
logger.info('inotify: %s (%s)', vm, file.path)
for conn in reactor.connections.get(vm, []):
if file.path == conn.transport.addr:
return
serial = SerialLineReceiverFactory(vm)
logger.info("connecting to %s (%s)", vm, file.path)
reactor.connectUNIX(file.path, serial)
class SerialLineReceiver(SerialLineReceiverBase):
def send_to_graphite(self, data):
client = GraphiteClientFactory()
client.protocol.data = data
client.protocol.name = self.factory.vm
reactor.connectTCP(getenv('GRAPHITE_HOST', '127.0.0.1'),
int(getenv('GRAPHITE_PORT', '2004')),
client)
def handle_command(self, command, args):
if command == 'agent_stopped':
agent_stopped.apply_async(queue='localhost.man',
args=(self.factory.vm, ))
elif command == 'agent_started':
version = args.get('version', None)
system = args.get('system', None)
agent_started.apply_async(queue='localhost.man',
args=(self.factory.vm, version, system))
elif command == 'renew':
renew.apply_async(queue='localhost.man',
args=(self.factory.vm, ))
elif command == 'ping':
self.send_response(response='pong',
args=args)
def handle_response(self, response, args):
vm = self.factory.vm
if response == 'status':
self.send_to_graphite(args)
else:
uuid = args.get('uuid', None)
if not uuid:
return
event = reactor.running_tasks[vm].get(uuid, None)
if event:
reactor.ended_tasks[vm][uuid] = args
event.set()
def connectionMade(self):
logger.info("connected to %s (%s)", self.factory.vm,
self.transport.addr)
if self.factory.vm not in reactor.connections:
reactor.connections[self.factory.vm] = set()
reactor.connections[self.factory.vm].add(self)
def connectionLost(self, reason):
logger.info("disconnected from %s (%s)", self.factory.vm,
self.transport.addr)
reactor.connections[self.factory.vm].remove(self)
class SerialLineReceiverFactory(protocol.ClientFactory):
protocol = SerialLineReceiver
def __init__(self, vm):
self.vm = vm
if vm not in reactor.running_tasks:
reactor.running_tasks[vm] = {}
if vm not in reactor.ended_tasks:
reactor.ended_tasks[vm] = {}
from twisted.protocols.basic import LineReceiver
import json
import logging
logger = logging.getLogger()
class SerialLineReceiverBase(LineReceiver, object):
delimiter = '\r'
def send_response(self, response, args):
self.transport.write(json.dumps({'response': response,
'args': args}) + '\r\n')
def send_command(self, command, args):
self.transport.write(json.dumps({'command': command,
'args': args}) + '\r\n')
def handle_command(self, command, args):
raise NotImplementedError("Subclass must implement abstract method")
def handle_response(self, response, args):
raise NotImplementedError("Subclass must implement abstract method")
def lineReceived(self, data):
if not data.strip(): # ignore empty lines
return
try:
data = json.loads(data)
args = data.get('args', {})
if not isinstance(args, dict):
args = {}
command = data.get('command', None)
response = data.get('response', None)
logger.debug('[serial] valid json: %s' % (data, ))
except (ValueError, KeyError) as e:
logger.error('[serial] invalid json: %s (%s)' % (data, e))
return
if command is not None and isinstance(command, unicode):
logger.debug('received command: %s (%s)' % (command, args))
self.handle_command(command, args)
elif response is not None and isinstance(response, unicode):
logger.debug('received reply: %s (%s)' % (response, args))
self.handle_response(response, args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment