Commit 34f94315 by Nagy Gergő

Fixed formating errors and added scheduled reporting.

parent b621e93d
[Client]
Frequency = 0
Debug = True
[Server]
Address = 10.7.0.96
Port = 5672
[AMQP]
Queue = monitor
User = cloud
Pass = password
Vhost = monitor
[Metrics]
cpuUsage = True
memoryUsage = True
......
#!/usr/bin/python
import platform, collections, time, socket, pika, struct
# import platform
# import collections
import time
import socket
import pika
# import struct
import logging
import os
logging.basicConfig()
class Client:
def __init__(self, config):
"""
Constructor of the client requires a configuration provided by cnfparse
modul. It is a dictionary: {server_address, server_port, frequency,
debugMode, amqp_user, amqp_pass, amqp_queue}.
modul. It is a dictionary: {debugMode}
"""
hostname = socket.gethostname().split('.')
hostname.reverse()
self.name = "circle." + ".".join(hostname)
self.server_address = str(config["server_address"])
self.server_port = int(config["server_port"])
self.delay = int(config["frequency"])
self.server_address = str(os.getenv("GRAPHITE_SERVER_ADDRESS"))
self.server_port = int(os.getenv("GRAPHITE_SERVER_PORT"))
self.debugMode = config["debugMode"]
self.amqp_user = str(config["amqp_user"])
self.amqp_pass = str(config["amqp_pass"])
self.amqp_queue = str(config["amqp_queue"])
self.amqp_virtual_host = str(config["amqp_virtual_host"])
self.amqp_user = str(os.getenv("GRAPHITE_AMQP_USER"))
self.amqp_pass = str(os.getenv("GRAPHITE_AMQP_PASSWORD"))
self.amqp_queue = str(os.getenv("GRAPHITE_AMQP_QUEUE"))
self.amqp_vhost = str(os.getenv("GRAPHITE_AMQP_VHOST"))
self.beat = 1
def __connect(self):
"""
......@@ -31,13 +39,11 @@ class Client:
"""
try:
credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=self.server_address,
port=self.server_port,
virtual_host=self.amqp_virtual_host,
credentials=credentials
)
)
params = pika.ConnectionParameters(host=self.server_address,
port=self.server_port,
virtual_host=self.amqp_vhost,
credentials=credentials)
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
return True
except:
......@@ -54,8 +60,8 @@ class Client:
"""
Send the message given in the parameters.
"""
self.channel.basic_publish(exchange=self.amqp_queue, routing_key='',
body="\n".join(message))
self.channel.basic_publish(exchange=self. amqp_queue,
routing_key='', body="\n".join(message))
def __collectFromNode(self, metricCollectors):
"""
......@@ -64,35 +70,46 @@ class Client:
"""
metrics = []
for collector in metricCollectors:
stat = collector()
metrics.append((
self.name + "." + stat.name +
" %d" % (stat.value) +
" %d" % (time.time())
))
if (self.beat % collector[1]) is 0:
stat = collector[0]()
metrics.append((self.name + "." +
stat.name + " %d" % (stat.value)
+ " %d" % (time.time())
))
return metrics
def startReporting(self, metricCollectors = [], debugMode = False):
def getMaxFrequency(self, metricCollectors=[]):
max = metricCollectors[0][1]
for item in metricCollectors:
if max < item[1]:
max = item[1]
return max
def startReporting(self, metricCollectors=[], debugMode=False):
"""
Call this method to start reporting to the server, it needs the
metricCollectors parameter that should be provided by the collectables
modul to work properly.
"""
if self.__connect() == False:
print ("An error has occured while connecting to the server on %s" %
(self.server_address + ":" + str(self.server_port)))
if self.__connect() is False:
print("An error has occured while connecting to the server on %s"
% (self.server_address + ":" + str(self.server_port)))
else:
print("Connection established to %s on port %s. \
Report frequency is %d sec. Clientname: %s" %
(self.server_address, self.server_port, self.delay, self.name)
)
Report frequency is %d sec. Clientname: %s"
% (self.server_address, self.server_port,
self.delay, self.name))
try:
maxFrequency = self.getMaxFrequency(metricCollectors)
while True:
metrics = self.__collectFromNode(metricCollectors)
metrics = self.__collectFromNode(metricCollectors)
if self.debugMode == "True":
print(metrics)
self.__send(metrics)
time.sleep(self.delay)
time.sleep(1)
self.beat = self.beat + 1
if ((self.beat % (maxFrequency + 1)) is 0):
self.beat = 1
except KeyboardInterrupt:
print("Reporting has stopped by the user. Exiting...")
finally:
......
import ConfigParser as configparser
def importConf(path_to_file):
config = configparser.RawConfigParser(allow_no_value = False)
config = configparser.RawConfigParser(allow_no_value=False)
try:
config.read(path_to_file)
params = {}
metrics = {}
params["frequency"] = config.get("Client" , "Frequency")
params["debugMode"] = config.get("Client" , "Debug")
params["server_address"] = config.get("Server" , "Address")
params["server_port"] = config.get("Server" , "Port")
params["amqp_queue"] = config.get("AMQP" , "Queue")
params["amqp_user"] = config.get("AMQP" , "User")
params["amqp_pass"] = config.get("AMQP" , "Pass")
params["amqp_virtual_host"] = config.get("AMQP" , "Vhost")
metrics["cpu.usage"] = config.get("Metrics", "cpuUsage")
metrics["memory.usage"] = config.get("Metrics", "memoryUsage")
metrics["user.count"] = config.get("Metrics", "userCount")
metrics["swap.usage"] = config.get("Metrics", "swapUsage")
metrics["system.boot_time"]= config.get("Metrics", "systemBootTime")
metrics["network"] = config.get("Metrics", "dataTraffic")
params["debugMode"] = config.get("Client", "Debug")
metrics["cpu.usage"] = int(config.get("Metrics", "cpuUsage"))
metrics["memory.usage"] = int(config.get("Metrics", "memoryUsage"))
metrics["user.count"] = int(config.get("Metrics", "userCount"))
metrics["swap.usage"] = int(config.get("Metrics", "swapUsage"))
metrics["system.boot_time"] = int(config.get("Metrics",
"systemBootTime"))
metrics["network"] = int(config.get("Metrics", "dataTraffic"))
except configparser.NoSectionError:
print("Config file contains error! Reason: Missing section.")
raise
......@@ -31,5 +26,3 @@ def importConf(path_to_file):
raise
return params, metrics
from metrics import *
class collectables:
__collectables = {
std.cpu.usage.name: [std.cpu.usage],
std.memory.usage.name: [std.memory.usage],
std.swap.usage.name: [std.swap.usage],
std.user.count.name: [std.user.count],
std.network.packages_sent.name: [std.network.packages_sent],
std.network.packages_received.name: [std.network.packages_received],
std.network.bytes_sent.name: [std.network.bytes_sent],
std.network.bytes_received.name: [std.network.bytes_received],
std.system.boot_time.name: [std.system.boot_time],
"network": [std.network.bytes_sent,
std.network.bytes_received,
std.network.packages_sent,
std.network.packages_received],
std.cpu.usage.name: [std.cpu.usage],
std.memory.usage.name: [std.memory.usage],
std.swap.usage.name: [std.swap.usage],
std.user.count.name: [std.user.count],
std.network.packages_sent.name: [std.network.packages_sent],
std.network.packages_received.name: [std.network.packages_received],
std.network.bytes_sent.name: [std.network.bytes_sent],
std.network.bytes_received.name: [std.network.bytes_received],
std.system.boot_time.name: [std.system.boot_time],
"network": [std.network.bytes_sent, std.network.bytes_received,
std.network.packages_sent, std.network.packages_received],
}
@staticmethod
......@@ -31,19 +30,18 @@ class collectables:
return [x.name for x in collectables.__collectables[key]]
@staticmethod
def provide(requests = []):
valid_keys = collectables.listKeys()
reqs = [request for request, value in requests.items() if value=="True"]
def provide(requests=[]):
#valid_keys = collectables.listKeys()
reqs = [request for request, value in requests.items()
if value > 0]
collectors = []
for request in reqs:
for item in collectables.__collectables[request]:
collectors.append(item.harvest)
collectors.append([item.harvest, value])
seen = set()
seen_add = seen.add
return [ x for x in collectors if x not in seen and not seen_add(x)]
return [x for x in collectors if x not in seen and not seen_add(x)]
@staticmethod
def provideAll():
return collectables.provide(collectables.listKeys())
......@@ -3,9 +3,11 @@
import psutil as ps
import collections
####################################################################################
#############################################################################
Metrics = collections.namedtuple("Metrics", ["name", "value"])
class Collection (object):
class Group (object):
class Metric (object):
......@@ -19,15 +21,15 @@ class Collection (object):
query = cls.collector_function.im_func(**cls.collector_function_arguments)
if ((isinstance(query, list)) or (isinstance(query, dict))):
return Metrics(cls.name,
query[cls.collector_function_result_attr])
elif (isinstance(query,tuple)):
query[cls.collector_function_result_attr])
elif (isinstance(query, tuple)):
return Metrics(cls.name,
query.__getattribute__(cls.collector_function_result_attr))
query.__getattribute__(cls.collector_function_result_attr))
else:
return Metrics(cls.name,
query)
return Metrics(cls.name, query)
##############################################################################
####################################################################################
class std (Collection):
......@@ -58,6 +60,7 @@ class std (Collection):
class count (Collection.Group.Metric):
name = "user.count"
@classmethod
def harvest(cls):
return Metrics(cls.name, len(ps.get_users()))
......@@ -75,9 +78,9 @@ class std (Collection):
collector_function_result_attr = "packets_recv"
class bytes_sent (Collection.Group.Metric):
name = "network.bytes_sent"
collector_function = ps.network_io_counters
collector_function_result_attr = "bytes_sent"
name = "network.bytes_sent"
collector_function = ps.network_io_counters
collector_function_result_attr = "bytes_sent"
class bytes_received(Collection.Group.Metric):
name = "network.bytes_received"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment