Commit 63ed7658 by Nagy Gergő

Working client for monitoring the cloud

parents
[Client]
Frequency = 5
Debug = True
[Server]
Address = 127.0.0.1
Port = 5672
[AMQP]
Queue = graphite
User = guest
Pass = guest
[Metrics]
cpuUsage = True
memoryUsage = True
userCount = True
swapUsage = True
systemBootTime = False
packageTraffic = False
dataTraffic = False
[Client]
Freuency = 5
Debug = True
[Server]
Address = 127.0.0.1
Port = 5672
[AMQP]
Queue = graphite
User = guest
Pass = guest
[Metrics]
cpuUsage = True
memoryUsage = True
userCount = True
swapUsage = True
systemBootTime = False
packageTraffic = False
dataTraffic = False
import sys
from src import cnfparse
def main():
if (len(sys.argv)<2):
print("usage: manage.py run [options]")
print("""
options:
--config <path> : path to the configuration file you intend
to start the client with
--debug : enables the debug mode and writes metrics sent to the
server
""")
if (len(sys.argv)==2 and sys.argv[1]=="run"):
configuration = cnfparse.importConf("config/client.conf")
if __name__ == "__main__":
main()
__all__ = ["client", "collectables", "metrics", "cnfparse"]
#!/usr/bin/python
import platform, collections, time, socket, pika, struct
class Client:
def __init__(self, config):
"""
Constructor of the client requires a configuration provided by cnfparse
modul. It is a dictionary: {server_address, server_port, frequency,
debugMode, amqp_user, amqp_pass, amqp_queue}.
"""
self.name = "circle." + socket.gethostname().split.('.').reverse()
self.server_address = str(config["server_address"])
self.server_port = int(config["server_port"])
self.delay = int(config["frequency"])
self.debug = config["debugMode"]
self.amqp_user = config["amqp_user"]
self.amqp_pass = config["amqp_pass"]
self.amqp_queue = config["aqmp_queue"]
def __connect(self):
"""
This method creates the connection to the queue of the graphite server.
Returns true if the connection was successful.
"""
try:
credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
self.connection = pika.BlockingConnection(pikaConnectionParameters(
host=self.server_address,
port=self.server_port,
credentials=credentials
)
)
self.channel = self.connection.channel()
return True
except:
return False
def __disconnect(self):
"""
Break up the connection to the graphite server.
"""
self.channel.close()
self.connection.close()
def __send(self, message):
"""
Send the message given in the parameters.
"""
self.channel.basic_publish(exchange='graphite', routing_key='',
body="\n".join(message))
def __collectFromNode(self, metricCollectors):
"""
It harvests the given metrics in the metricCollectors list. This list
should be provided by the collectables modul.
"""
metrics = []
for collector in metricCollectors:
stat = collector()
metrics.append((
self.name + "." + stat.name +
" %d" % (stat.value) +
" %d" % (time.time())
))
return metrics
def startReporting(self, metricCollectors = [], debugMode = False):
"""
Call this method to start reporting to the server, it needs the
metricCollectors parameter that should be provided by the collectables
modul to work properly.
"""
if self.__connect() == False:
print ("An error has occured while connecting to the server on %s" %
(self.server_address + ":" + str(self.server_port)))
return -1
else:
print("Connection established to %s on port %s. \
Report frequency is %d sec. Clientname: %s" %
(self.server_address, self.server_port, self.delay, self.name)
)
try:
while True:
metrics = self.__collectFromNode(metricCollectors)
if self.debugMode == True:
print(metrics)
self.__send(metrics)
time.sleep(self.delay)
except KeyboardInterrupt:
print("Reporting has stopped by the user. Exiting...")
finally:
self.__disconnect()
import configparser
def importConf(path_to_file):
config = configparser.RawConfigParser(allow_no_value = False)
try:
config.read(path_to_file)
params = {}
params["frequency"] = config.get("Client" , "Frequency")
params["debugMode"] = config.get("Client" , "Debug")
params["server_address"] = config.get("Server" , "Address")
params["server_port"] = config.get("Server" , "Port")
params["aqmp_queue"] = config.get("AMQP" , "Queue")
params["aqmp_user"] = config.get("AMQP" , "User")
params["aqmp_pass"] = config.get("AMQP" , "Pass")
params["cpu.usage"] = config.get("Metrics", "cpuUsage")
params["memory.usage"] = config.get("Metrics", "memoryUsage")
params["user.count"] = config.get("Metrics", "userCount")
params["swap.usage"] = config.get("Metrics", "swapUsage")
params["system.boot_time"]= config.get("Metrics", "systemBootTime")
params["package.traffic"] = config.get("Metrics", "packageTraffic")
params["data.traffic"] = config.get("Metrics", "dataTraffic")
except configparser.NoSectionError:
print("Config file contains error! Reason: Missing section.")
raise
except configparser.ParsingError:
print("Config file contains error! Reason: Cannot parse.")
raise
except configparser.MissingSectionHeader:
print("Config file contains error! Reason: Missing section-header.")
raise
return params
from metrics import *
class collectables:
__collectables = {
"cpu.usage": [std.cpu.usage],
"memory.usage": [std.memory.usage],
"network.packages_sent": [std.network.packages_sent],
"network.packages_received": [std.network.packages_received],
"network.bytes_sent": [std.network.bytes_sent],
"network.bytes_received": [std.network.bytes_received],
"network": [std.network.bytes_sent,
std.network.bytes_received,
std.network.packages_sent,
std.network.packages_received],
}
@staticmethod
def list():
return list(collectables.__collectables.keys())
@staticmethod
def keyToSet(key):
return collectables.__collectables[key]
@staticmethod
def provide( requests = []):
collectors = []
for request in requests:
for item in collectables.__collectables[request]:
collectors.append(item.harvest)
seen = set()
seen_add = seen.add
return [ x for x in collectors if x not in seen and not seen_add(x)]
@staticmethod
def provideAll():
return collectables.provide(collectables.list())
#!/usr/bin/python
import psutil as ps
import collections
####################################################################################
Metrics = collections.namedtuple("Metrics", ["name", "value"])
class Collection (object):
class Group (object):
class Metric (object):
@staticmethod
def harvest():
raise NotImplementedError("You must implement the harvest method.")
####################################################################################
class std (Collection):
class cpu (Collection.Group):
class usage (Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("cpu.usage", ps.cpu_percent(0))
class memory (Collection.Group):
class usage(Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("memory.usage", ps.virtual_memory().percent)
class user (Collection.Group):
class count (Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("user.count", len(ps.get_users()))
class network (Collection.Group):
class packages_sent (Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("network.packages_sent", ps.net_io_counters().packets_sent)
class packages_received (Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("network.packages_received", ps.net_io_counters().packets_recv)
class bytes_sent (Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("network.bytes_sent", ps.net_io_counters().bytes_sent)
class bytes_received(Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("network.bytes_recv", ps.net_io_counters().bytes_recv)
class system (Collection.Group):
class boot_time (Collection.Group.Metric):
@staticmethod
def harvest():
return Metrics("system.boot_time", ps.get_boot_time())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment