Commit f235c71d by Bach Dániel Committed by cloud

fix scalability and clean up code

 * nested loop sent each metric n times per tick
 * logging framework + terser output
 * fix frequency handling
 * remove boilerplate
 * improve code style
 * closes #109
parent 82348f5a
[Client] [Client]
Debug = True Debug = False
[Metrics] [Metrics]
cpuUsage = 5 cpuUsage = 5
......
import logging
import sys import sys
from src import cnfparse from src import cnfparse
from src import client from src import client
...@@ -5,6 +6,11 @@ from src.collectables import collectables ...@@ -5,6 +6,11 @@ from src.collectables import collectables
def main(): def main():
# define a Handler which writes INFO messages or higher to the sys.stderr
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s: %(levelname)s/%(name)s] %(message)s')
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("usage: manage.py run") print("usage: manage.py run")
if len(sys.argv) is not 2 and sys.argv[1] is not "run": if len(sys.argv) is not 2 and sys.argv[1] is not "run":
......
#!/usr/bin/python #!/usr/bin/python
from datetime import datetime from datetime import datetime
import time from socket import gethostname
import socket import argparse
import pika
import psutil
import logging import logging
import operator
import os import os
import pika
import psutil
import time
logger = logging.getLogger(__name__)
logging.basicConfig()
class Client: class Client:
env_config = { env_config = {
"host": "GRAPHITE_HOST", "server_address": "GRAPHITE_HOST",
"port": "GRAPHITE_PORT", "server_port": "GRAPHITE_PORT",
"amqp_user": "GRAPHITE_AMQP_USER", "amqp_user": "GRAPHITE_AMQP_USER",
"amqp_pass": "GRAPHITE_AMQP_PASSWORD", "amqp_pass": "GRAPHITE_AMQP_PASSWORD",
"amqp_queue": "GRAPHITE_AMQP_QUEUE", "amqp_queue": "GRAPHITE_AMQP_QUEUE",
...@@ -37,58 +38,18 @@ class Client: ...@@ -37,58 +38,18 @@ class Client:
- GRAPHITE_AMQP_VHOST: - GRAPHITE_AMQP_VHOST:
Missing only one of these variables will cause the client not to work. Missing only one of these variables will cause the client not to work.
""" """
hostname = socket.gethostname().split('.') self.name = 'circle.%s' % gethostname()
hostname.reverse() for var, env_var in self.env_config.items():
separator = '.' value = os.getenv(env_var, "")
self.name = 'circle.%(host)s' % {'host': separator.join(hostname)} if value:
self.server_address = str(os.getenv(self.env_config['host'])) setattr(self, var, value)
self.server_port = int(os.getenv(self.env_config['port'])) else:
self.amqp_user = str(os.getenv(self.env_config['amqp_user'])) raise RuntimeError('%s environment variable missing' % env_var)
self.amqp_pass = str(os.getenv(self.env_config['amqp_pass']))
self.amqp_queue = str(os.getenv(self.env_config['amqp_queue']))
self.amqp_vhost = str(os.getenv(self.env_config['amqp_vhost']))
host_check = Client.__check_envvar(self.server_address)
port_check = Client.__check_envvar(self.server_port)
amqp_pass_check = Client.__check_envvar(self.amqp_pass)
amqp_user_check = Client.__check_envvar(self.amqp_user)
amqp_queue_check = Client.__check_envvar(self.amqp_queue)
amqp_vhost_check = Client.__check_envvar(self.amqp_vhost)
if host_check:
print(('%(host)s cannot be found in environmental variables.')
% {'host': self.env_config['host']}
)
raise RuntimeError
if port_check:
print(('%(port)s cannot be found in environmental variables. ')
% {'port': self.env_config['port']}
)
raise RuntimeError
if amqp_user_check or amqp_pass_check:
print(('%(user)s or %(pass)s cannot be '
'found in environmental variables.')
% {'user': self.env_config['amqp_user'],
'pass': self.env_config['amqp_pass']}
)
raise RuntimeError
amqp_pass_check = Client.__check_envvar(self.amqp_pass)
amqp_user_check = Client.__check_envvar(self.amqp_user)
if amqp_vhost_check or amqp_queue_check:
print(('%(queue)s or %(vhost)s cannot be '
'found in environmental variables.')
% {'queue': self.env_config['amqp_queue'],
'vhost': self.env_config['amqp_vhost']}
)
raise RuntimeError
self.debugMode = config["debugMode"] self.debugMode = config["debugMode"]
self.kvmCPU = int(config["kvmCpuUsage"]) self.kvmCPU = int(config["kvmCpuUsage"])
self.kvmMem = int(config["kvmMemoryUsage"]) self.kvmMem = int(config["kvmMemoryUsage"])
self.kvmNet = int(config["kvmNetworkUsage"]) self.kvmNet = int(config["kvmNetworkUsage"])
self.beat = 1 self.beat = 0
self.valid = True
@classmethod
def __check_envvar(cls, variable):
return variable == "None" or variable == ""
def connect(self): def connect(self):
""" """
...@@ -99,22 +60,21 @@ class Client: ...@@ -99,22 +60,21 @@ class Client:
try: try:
credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass) credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
params = pika.ConnectionParameters(host=self.server_address, params = pika.ConnectionParameters(host=self.server_address,
port=self.server_port, port=int(self.server_port),
virtual_host=self.amqp_vhost, virtual_host=self.amqp_vhost,
credentials=credentials) credentials=credentials)
self.connection = pika.BlockingConnection(params) self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel() self.channel = self.connection.channel()
return True logger.info('Connection established to %s.', self.server_address)
except RuntimeError: except RuntimeError:
print ('[ERROR] Cannot connect to the server. ' logger.error('Cannot connect to the server. '
'Parameters could be wrong.' 'Parameters may be wrong.')
) logger.error("An error has occured while connecting to the server")
return False raise
except: except: # FIXME
print ('[ERROR] Cannot connect to the server. There is no one ' logger.error('Cannot connect to the server. There is no one '
'listening on the other side.' 'listening on the other side.')
) raise
return False
def disconnect(self): def disconnect(self):
""" """
...@@ -124,10 +84,9 @@ class Client: ...@@ -124,10 +84,9 @@ class Client:
try: try:
self.channel.close() self.channel.close()
self.connection.close() self.connection.close()
except RuntimeError: except RuntimeError as e:
print('[ERROR] An error has occured ' logger.error('An error has occured while disconnecting. %s', unicode(e))
'while disconnecting from the server.' raise
)
def send(self, message): def send(self, message):
""" """
...@@ -141,9 +100,7 @@ class Client: ...@@ -141,9 +100,7 @@ class Client:
routing_key='', body="\n".join(message)) routing_key='', body="\n".join(message))
return True return True
except: except:
print('[ERROR] An error has occured ' logger.error('An error has occured while sending metrics.')
'while sending metrics to the server.'
)
return False return False
def collect_node(self, metricCollectors): def collect_node(self, metricCollectors):
...@@ -156,7 +113,7 @@ class Client: ...@@ -156,7 +113,7 @@ class Client:
for collector in metricCollectors: for collector in metricCollectors:
collector_function = collector[0] collector_function = collector[0]
phase = collector[1] phase = collector[1]
if (self.beat % phase) is 0: if self.beat % phase == 0:
stat = collector_function() stat = collector_function()
metrics.append(('%(hostname)s.%(name)s %(value)f %(time)d') % metrics.append(('%(hostname)s.%(name)s %(value)f %(time)d') %
{'hostname': self.name, {'hostname': self.name,
...@@ -172,84 +129,60 @@ class Client: ...@@ -172,84 +129,60 @@ class Client:
resource usages about the vms. resource usages about the vms.
""" """
metrics = [] metrics = []
now = time.time()
running_vms = [] running_vms = []
procList = psutil.get_process_list()
beats = { beats = {
'mem': self.beat % self.kvmMem, 'mem': self.beat % self.kvmMem == 0,
'cpu': self.beat % self.kvmCPU, 'cpu': self.beat % self.kvmCPU == 0,
'net': self.beat % self.kvmNet 'net': self.beat % self.kvmNet == 0
} }
for entry in procList:
try: if beats['cpu'] or beats['mem']:
entry_name = entry.name for entry in psutil.get_process_list():
if entry_name in "kvm":
cmdLine = entry.as_dict()["cmdline"]
search = [cmd_param_index for cmd_param_index, cmd_param in
enumerate(cmdLine)
if cmd_param == "-name"]
if not entry.is_running():
break
memory = [cmd_param_index for cmd_param_index, cmd_param in
enumerate(cmdLine)
if cmd_param == "-m"]
if not entry.is_running():
break
try: try:
running_vms.append([cmdLine[search[0] + 1], if entry.name == 'kvm':
entry.pid, parser = argparse.ArgumentParser()
int(entry.as_dict()["cmdline"][ parser.add_argument('-name')
memory[0] + 1])]) parser.add_argument('--memory-size', '-m ', type=int)
except IndexError: args, unknown = parser.parse_known_args(entry.cmdline[1:])
pass
for vm in running_vms: process = psutil.Process(entry.pid)
vm_proc = psutil.Process(vm[1])
if ((beats['cpu'] is 0) and vm_proc.is_running()): mem_perc = process.get_memory_percent() / 100 * args.memory_size
mem_perc = vm_proc.get_memory_percent() / 100 * vm[2] metrics.append('vm.%(name)s.memory.usage %(value)f '
metrics.append("vm.%s.memory.usage %f %d" '%(time)d' % {'name': args.name,
% (vm[0], mem_perc, time.time())) 'value': mem_perc,
if ((beats['mem'] is 0) and vm_proc.is_running()): 'time': now})
systemtime = vm_proc.get_cpu_times().system user_time, system_time = process.get_cpu_times()
usertime = vm_proc.get_cpu_times().user sum_time = system_time + user_time
sumCpu = systemtime + usertime metrics.append('vm.%(name)s.cpu.usage %(value)f '
metrics.append("vm.%s.cpu.usage %f %d" '%(time)d' % {'name': args.name,
% (vm[0], sumCpu, time.time())) 'value': sum_time,
'time': now})
running_vms.append(args.name)
except psutil.NoSuchProcess:
logger.warning('Process %d lost.', entry.pid)
interfaces_list = psutil.network_io_counters(pernic=True) interfaces_list = psutil.network_io_counters(pernic=True)
if beats['net'] is 0: if beats['net']:
for vm in running_vms: for interface, data in interfaces_list.iteritems():
interfaces_list_enum = enumerate(interfaces_list) try:
for iname_index, iname in interfaces_list_enum: vm, vlan = interface.rsplit('-', 1)
if vm[0] in iname: except ValueError:
metrics.append( continue
('vm.%(name)s.network.packets_sent_%(interface)s ' if vm in running_vms:
'%(data)f %(time)d') % for metric in ('packets_sent', 'packets_recv',
{'name': vm[0], 'bytes_sent', 'bytes_recv'):
'interface': iname,
'time': time.time(),
'data': interfaces_list[iname].packets_sent})
metrics.append(
('vm.%(name)s.network.packets_recv_%(interface)s '
'%(data)f %(time)d') %
{'name': vm[0],
'interface': iname,
'time': time.time(),
'data': interfaces_list[iname].packets_recv})
metrics.append( metrics.append(
('vm.%(name)s.network.bytes_sent_%(interface)s ' 'vm.%(name)s.network.%(metric)s-'
'%(data)f %(time)d') % '%(interface)s %(data)f %(time)d' %
{'name': vm[0], {'name': vm,
'interface': iname, 'interface': vlan,
'time': time.time(), 'metric': metric,
'data': interfaces_list[iname].bytes_sent}) 'time': now,
metrics.append( 'data': getattr(data, metric)})
('vm.%(name)s.network.bytes_recv_%(interface)s '
'%(data)f %(time)d') % if (self.beat % 30) == 0:
{'name': vm[0],
'interface': iname,
'time': time.time(),
'data': interfaces_list[iname].bytes_recv})
except psutil.NoSuchProcess:
print('[ERROR LOG] Process lost.')
if (self.beat % 30) is 0:
metrics.append( metrics.append(
('%(host)s.vmcount %(data)d %(time)d') % ('%(host)s.vmcount %(data)d %(time)d') %
{'host': self.name, {'host': self.name,
...@@ -262,21 +195,8 @@ class Client: ...@@ -262,21 +195,8 @@ class Client:
""" """
items = metricCollectors + [["kvmCpuUsage", self.kvmMem], [ items = metricCollectors + [["kvmCpuUsage", self.kvmMem], [
"kvmMemoryUsage", self.kvmCPU], ["kvmNetworkUsage", self.kvmNet]] "kvmMemoryUsage", self.kvmCPU], ["kvmNetworkUsage", self.kvmNet]]
max = items[0][1] freqs = set([i[1] for i in items if i[1]>0])
for item in items: return reduce(operator.mul, freqs, 1)
if max < item[1]:
max = item[1]
return max
@classmethod
def print_metrics(cls, metrics):
for metric in metrics:
parts = metric.split(' ')
parts[2] = datetime.fromtimestamp(int(parts[2])).strftime('%Y-%m-%d %H:%M:%S')
print('********************************************')
print('[M] %(title)s' % {'title': parts[0]})
print(' -> data: %(data)s' % {'data': parts[1]})
print(' -> time: %(time)s' % {'time': parts[2]})
def run(self, metricCollectors=[]): def run(self, metricCollectors=[]):
""" """
...@@ -284,32 +204,25 @@ class Client: ...@@ -284,32 +204,25 @@ class Client:
metricCollectors parameter that should be provided by the collectables metricCollectors parameter that should be provided by the collectables
modul to work properly. modul to work properly.
""" """
if self.connect() is False: self.connect()
hostdata = self.server_address + ':' + str(self.server_port)
print("[ERROR] An error has occured while connecting to the "
"server on %(host)s."
% {'host': hostdata})
return
else:
print('[SUCCESS] Connection established to %(host)s:%(port)s.'
% {'host': self.server_address,
'port': self.server_port})
try: try:
maxFrequency = self.get_frequency(metricCollectors) maxFrequency = self.get_frequency(metricCollectors)
while True: while True:
nodeMetrics = self.collect_node(metricCollectors) nodeMetrics = self.collect_node(metricCollectors)
vmMetrics = self.collect_vms() vmMetrics = self.collect_vms()
metrics = nodeMetrics + vmMetrics metrics = nodeMetrics + vmMetrics
if metrics:
if self.debugMode == "True": if self.debugMode == "True":
Client.print_metrics(metrics) for i in metrics:
if len(metrics) is not 0: logger.debug('Metric to send: %s', i)
if self.send(metrics) is False: logger.info("%d metrics sent", len(metrics))
if not self.send(metrics):
raise RuntimeError raise RuntimeError
time.sleep(1) time.sleep(1)
self.beat = self.beat + 1 self.beat += 1
if ((self.beat % (maxFrequency + 1)) is 0): if self.beat % maxFrequency == 0:
self.beat = 1 self.beat = 0
except KeyboardInterrupt: except KeyboardInterrupt:
print("[x] Reporting has stopped by the user. Exiting...") logger.info("Reporting has stopped by the user. Exiting...")
finally: finally:
self.disconnect() self.disconnect()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment