Commit e0fb01fd by Bach Dániel

Merge branch 'issue-89-v2' into 'master'

Monitor response time, celery availability, boot time and running template instances

Graphite: http://pc3.szgt.uni-miskolc.hu:12275/
parents 0ac786b5 75cd80a8
...@@ -271,6 +271,7 @@ LOCAL_APPS = ( ...@@ -271,6 +271,7 @@ LOCAL_APPS = (
'dashboard', 'dashboard',
'manager', 'manager',
'acl', 'acl',
'monitor',
) )
# See: https://docs.djangoproject.com/en/dev/ref/settings/#installed-apps # See: https://docs.djangoproject.com/en/dev/ref/settings/#installed-apps
......
...@@ -30,7 +30,9 @@ celery = Celery('manager', ...@@ -30,7 +30,9 @@ celery = Celery('manager',
'vm.tasks.local_agent_tasks', 'vm.tasks.local_agent_tasks',
'storage.tasks.local_tasks', 'storage.tasks.local_tasks',
'storage.tasks.periodic_tasks', 'storage.tasks.periodic_tasks',
'firewall.tasks.local_tasks', ]) 'firewall.tasks.local_tasks',
'monitor.tasks.local_periodic_tasks',
])
celery.conf.update( celery.conf.update(
CELERY_RESULT_BACKEND='cache', CELERY_RESULT_BACKEND='cache',
...@@ -69,6 +71,24 @@ celery.conf.update( ...@@ -69,6 +71,24 @@ celery.conf.update(
'schedule': timedelta(hours=24), 'schedule': timedelta(hours=24),
'options': {'queue': 'localhost.man'} 'options': {'queue': 'localhost.man'}
}, },
'monitor.measure_response_time': {
'task': 'monitor.tasks.local_periodic_tasks.'
'measure_response_time',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.man'}
},
'monitor.check_celery_queues': {
'task': 'monitor.tasks.local_periodic_tasks.'
'check_celery_queues',
'schedule': timedelta(seconds=60),
'options': {'queue': 'localhost.man'}
},
'monitor.instance_per_template': {
'task': 'monitor.tasks.local_periodic_tasks.'
'instance_per_template',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.man'}
},
} }
) )
from itertools import islice
from socket import gethostname
import logging
import os
import pika
logger = logging.getLogger(__name__)
class Client:
env_config = {
"server_address": "GRAPHITE_HOST",
"server_port": "GRAPHITE_AMQP_PORT",
"amqp_user": "GRAPHITE_AMQP_USER",
"amqp_pass": "GRAPHITE_AMQP_PASSWORD",
"amqp_queue": "GRAPHITE_AMQP_QUEUE",
"amqp_vhost": "GRAPHITE_AMQP_VHOST",
}
def __init__(self):
"""
Constructor of the client class that is responsible for handling the
communication between the graphite server and the data source. In
order to initialize a client you must have the following
environmental varriables:
- GRAPHITE_SERVER_ADDRESS:
- GRAPHITE_SERVER_PORT:
- GRAPHITE_AMQP_USER:
- GRAPHITE_AMQP_PASSWORD:
- GRAPHITE_AMQP_QUEUE:
- GRAPHITE_AMQP_VHOST:
Missing only one of these variables will cause the client not to work.
"""
self.name = 'circle.%s' % gethostname()
for var, env_var in self.env_config.items():
value = os.getenv(env_var, "")
if value:
setattr(self, var, value)
else:
raise RuntimeError('%s environment variable missing' % env_var)
def connect(self):
"""
This method creates the connection to the queue of the graphite
server using the environmental variables given in the constructor.
"""
try:
credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
params = pika.ConnectionParameters(host=self.server_address,
port=int(self.server_port),
virtual_host=self.amqp_vhost,
credentials=credentials)
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
logger.info('Connection established to %s.', self.server_address)
except RuntimeError:
logger.error('Cannot connect to the server. '
'Parameters may be wrong.')
logger.error("An error has occured while connecting to the server")
raise
except: # FIXME
logger.error('Cannot connect to the server. There is no one '
'listening on the other side.')
raise
def disconnect(self):
"""
Break up the connection to the graphite server. If something went
wrong while disconnecting it simply cut the connection up.
"""
try:
self.channel.close()
self.connection.close()
except RuntimeError as e:
logger.error('An error has occured while disconnecting. %s',
unicode(e))
raise
def _send(self, message):
"""
Send the message given in the parameters given in the message
parameter. This function expects that the graphite server want the
metric name given in the message body. (This option must be enabled
on the server. Otherwise it can't parse the data sent.)
"""
body = "\n".join(message)
try:
self.channel.basic_publish(exchange=self.amqp_queue,
routing_key='', body=body)
except:
logger.error('An error has occured while sending metrics (%dB).',
len(body))
raise
@staticmethod
def _chunker(seq, size):
"""Yield seq in size-long chunks.
"""
for pos in xrange(0, len(seq), size):
yield islice(seq, pos, pos + size)
def send(self, message):
self.connect()
try:
for chunk in self._chunker(message, 100):
self._send(chunk)
finally:
self.disconnect()
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
import logging
import requests
from time import time
from django.conf import settings
from manager.mancelery import celery
from vm.tasks.vm_tasks import check_queue
from vm.models import Node, InstanceTemplate
from storage.models import DataStore
from monitor.client import Client
logger = logging.getLogger(__name__)
@celery.task(ignore_result=True)
def measure_response_time():
r = requests.get(settings.DJANGO_URL, verify=False)
total_miliseconds = (
r.elapsed.seconds * 10**6 +
r.elapsed.microseconds) / 1000
Client().send([
"%(name)s %(val)d %(time)s" % {
'name': "portal.response_time",
'val': total_miliseconds,
'time': time(),
}
])
@celery.task(ignore_result=True)
def check_celery_queues():
graphite_string = lambda component, hostname, celery, is_alive, time: (
"%s.%s.celery-queues.%s %d %s" % (
component, hostname, celery, 1 if is_alive else 0, time)
)
metrics = []
for n in Node.objects.all(): # disabled, offline nodes?
for s in ["fast", "slow"]:
is_queue_alive = check_queue(n.host.hostname, "vm", s)
metrics.append(graphite_string("circle", n.host.hostname,
"vm-" + s, is_queue_alive, time()))
is_net_queue_alive = check_queue(n.host.hostname, "net", "fast")
metrics.append(graphite_string("circle", n.host.hostname,
"net-fast", is_net_queue_alive, time()))
is_agent_queue_alive = check_queue(n.host.hostname, "agent")
metrics.append(graphite_string("circle", n.host.hostname, "agent",
is_agent_queue_alive, time()))
for ds in DataStore.objects.all():
for s in ["fast", "slow"]:
is_queue_alive = check_queue(ds.hostname, "vm", s)
metrics.append(graphite_string("storage", ds.hostname,
"storage-" + s, is_queue_alive,
time()))
Client().send(metrics)
@celery.task(ignore_result=True)
def instance_per_template():
graphite_string = lambda pk, state, val, time: (
"template.%d.instances.%s %d %s" % (
pk, state, val, time)
)
metrics = []
for t in InstanceTemplate.objects.all():
base = t.instance_set.filter(destroyed_at=None)
running = base.filter(status="RUNNING").count()
not_running = base.exclude(status="RUNNING").count()
metrics.append(graphite_string(t.pk, "running", running, time()))
metrics.append(graphite_string(t.pk, "not_running", not_running,
time()))
Client().send(metrics)
...@@ -24,7 +24,9 @@ from base64 import encodestring ...@@ -24,7 +24,9 @@ from base64 import encodestring
from StringIO import StringIO from StringIO import StringIO
from tarfile import TarFile, TarInfo from tarfile import TarFile, TarInfo
from django.conf import settings from django.conf import settings
from django.utils import timezone
from celery.result import TimeoutError from celery.result import TimeoutError
from monitor.client import Client
def send_init_commands(instance, act, vm): def send_init_commands(instance, act, vm):
...@@ -86,12 +88,33 @@ def agent_started(vm, version=None): ...@@ -86,12 +88,33 @@ def agent_started(vm, version=None):
pass pass
if not initialized: if not initialized:
measure_boot_time(instance)
send_init_commands(instance, act, vm) send_init_commands(instance, act, vm)
with act.sub_activity('start_access_server'): with act.sub_activity('start_access_server'):
start_access_server.apply_async(queue=queue, args=(vm, )) start_access_server.apply_async(queue=queue, args=(vm, ))
def measure_boot_time(instance):
if not instance.template:
return
from vm.models import InstanceActivity
deploy_time = InstanceActivity.objects.filter(
instance=instance, activity_code="vm.Instance.deploy"
).latest("finished").finished
total_boot_time = (timezone.now() - deploy_time).total_seconds()
Client().send([
"template.%(pk)d.boot_time %(val)f %(time)s" % {
'pk': instance.template.pk,
'val': total_boot_time,
'time': time.time(),
}
])
@celery.task @celery.task
def agent_stopped(vm): def agent_stopped(vm):
from vm.models import Instance, InstanceActivity from vm.models import Instance, InstanceActivity
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment