Commit 54dadb4e by Gregory Nagy

Client vm report path vm. domain added

parent d4be0ad2
...@@ -12,267 +12,268 @@ logging.basicConfig() ...@@ -12,267 +12,268 @@ logging.basicConfig()
class Client: class Client:
def __init__(self, config):
"""
Constructor of the client class that is responsible for handling the
communication between the graphite server and the data source. In
order to initialize a client you must have the following
environmental varriables:
- GRAPHITE_SERVER_ADDRESS:
- GRAPHITE_SERVER_PORT:
- GRAPHITE_AMQP_USER:
- GRAPHITE_AMQP_PASSWORD:
- GRAPHITE_AMQP_QUEUE:
- GRAPHITE_AMQP_VHOST:
Missing only one of these variables will cause the client not to work.
"""
hostname = socket.gethostname().split('.')
hostname.reverse()
self.name = "circle." + ".".join(hostname)
if os.getenv("GRAPHITE_SERVER_ADDRESS") is "":
print("GRAPHITE_SERVER_ADDRESS cannot be found in environmental "
"variables"
)
self.valid = False
return
if os.getenv("GRAPHITE_SERVER_PORT") is "":
print("GRAPHITE_SERVER_PORT cannot be found in environmental "
"variables. (AMQP standard is: 5672"
)
self.valid = False
return
if os.getenv("GRAPHITE_AMQP_USER") is "" or os.getenv(
"GRAPHITE_AMQP_PASSWORD") is "":
print("GRAPHITE_AMQP_USER or GRAPHITE_AMQP_PASSWORD cannot be "
"found in environmental variables. (AMQP standard is: "
"guest-guest)"
)
self.valid = False
return
if os.getenv("GRAPHITE_AMQP_QUEUE") is "" or os.getenv(
"GRAPHITE_AMQP_VHOST") is "":
print("GRAPHITE_AMQP_QUEUE or GRAPHITE_AMQP_VHOST cannot be "
"found in environmental variables."
)
self.valid = False
return
self.server_address = str(os.getenv("GRAPHITE_SERVER_ADDRESS"))
self.server_port = int(os.getenv("GRAPHITE_SERVER_PORT"))
self.amqp_user = str(os.getenv("GRAPHITE_AMQP_USER"))
self.amqp_pass = str(os.getenv("GRAPHITE_AMQP_PASSWORD"))
self.amqp_queue = str(os.getenv("GRAPHITE_AMQP_QUEUE"))
self.amqp_vhost = str(os.getenv("GRAPHITE_AMQP_VHOST"))
self.debugMode = config["debugMode"]
self.kvmCPU = int(config["kvmCpuUsage"])
self.kvmMem = int(config["kvmMemoryUsage"])
self.kvmNet = int(config["kvmNetworkUsage"])
self.beat = 1
self.valid = True
def __init__(self, config):
"""
Constructor of the client class that is responsible for handling the
communication between the graphite server and the data source. In
order to initialize a client you must have the following
environmental varriables:
- GRAPHITE_SERVER_ADDRESS:
- GRAPHITE_SERVER_PORT:
- GRAPHITE_AMQP_USER:
- GRAPHITE_AMQP_PASSWORD:
- GRAPHITE_AMQP_QUEUE:
- GRAPHITE_AMQP_VHOST:
Missing only one of these variables will cause the client not to work.
"""
hostname = socket.gethostname().split('.')
hostname.reverse()
self.name = "circle." + ".".join(hostname)
if os.getenv("GRAPHITE_SERVER_ADDRESS") is "":
print("GRAPHITE_SERVER_ADDRESS cannot be found in environmental "
"variables"
)
self.valid = False
return
if os.getenv("GRAPHITE_SERVER_PORT") is "":
print("GRAPHITE_SERVER_PORT cannot be found in environmental "
"variables. (AMQP standard is: 5672"
)
self.valid = False
return
if os.getenv("GRAPHITE_AMQP_USER") is "" or os.getenv(
"GRAPHITE_AMQP_PASSWORD") is "":
print("GRAPHITE_AMQP_USER or GRAPHITE_AMQP_PASSWORD cannot be "
"found in environmental variables. (AMQP standard is: "
"guest-guest)"
)
self.valid = False
return
if os.getenv("GRAPHITE_AMQP_QUEUE") is "" or os.getenv(
"GRAPHITE_AMQP_VHOST") is "":
print("GRAPHITE_AMQP_QUEUE or GRAPHITE_AMQP_VHOST cannot be "
"found in environmental variables."
)
self.valid = False
return
self.server_address = str(os.getenv("GRAPHITE_SERVER_ADDRESS"))
self.server_port = int(os.getenv("GRAPHITE_SERVER_PORT"))
self.amqp_user = str(os.getenv("GRAPHITE_AMQP_USER"))
self.amqp_pass = str(os.getenv("GRAPHITE_AMQP_PASSWORD"))
self.amqp_queue = str(os.getenv("GRAPHITE_AMQP_QUEUE"))
self.amqp_vhost = str(os.getenv("GRAPHITE_AMQP_VHOST"))
self.debugMode = config["debugMode"]
self.kvmCPU = int(config["kvmCpuUsage"])
self.kvmMem = int(config["kvmMemoryUsage"])
self.kvmNet = int(config["kvmNetworkUsage"])
self.beat = 1
self.valid = True
def __connect(self): def __connect(self):
""" """
This method creates the connection to the queue of the graphite This method creates the connection to the queue of the graphite
server using the environmental variables given in the constructor. server using the environmental variables given in the constructor.
Returns true if the connection was successful. Returns true if the connection was successful.
""" """
try: try:
credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass) credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
params = pika.ConnectionParameters(host=self.server_address, params = pika.ConnectionParameters(host=self.server_address,
port=self.server_port, port=self.server_port,
virtual_host=self.amqp_vhost, virtual_host=self.amqp_vhost,
credentials=credentials) credentials=credentials)
self.connection = pika.BlockingConnection(params) self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel() self.channel = self.connection.channel()
return True return True
except RuntimeError: except RuntimeError:
print ("[ERROR] Cannot connect to the server. " print ("[ERROR] Cannot connect to the server. "
"Parameters could be wrong." "Parameters could be wrong."
) )
return False return False
except: except:
print ("[ERROR] Cannot connect to the server. There is no one " print ("[ERROR] Cannot connect to the server. There is no one "
"listening on the other side." "listening on the other side."
) )
return False return False
def __disconnect(self): def __disconnect(self):
""" """
Break up the connection to the graphite server. If something went Break up the connection to the graphite server. If something went
wrong while disconnecting it simply cut the connection up. wrong while disconnecting it simply cut the connection up.
""" """
try: try:
self.channel.close() self.channel.close()
self.connection.close() self.connection.close()
except RuntimeError: except RuntimeError:
print("[ERROR] An error has occured while disconnecting from the " print("[ERROR] An error has occured while disconnecting from the "
"server." "server."
) )
def __send(self, message): def __send(self, message):
""" """
Send the message given in the parameters given in the message Send the message given in the parameters given in the message
parameter. This function expects that the graphite server want the parameter. This function expects that the graphite server want the
metric name given in the message body. (This option must be enabled metric name given in the message body. (This option must be enabled
on the server. Otherwise it can't parse the data sent.) on the server. Otherwise it can't parse the data sent.)
""" """
try: try:
self.channel.basic_publish(exchange=self.amqp_queue, self.channel.basic_publish(exchange=self.amqp_queue,
routing_key='', body="\n".join(message)) routing_key='', body="\n".join(message))
return True return True
except: except:
print("[ERROR] An error has occured while sending metrics to the " print("[ERROR] An error has occured while sending metrics to the "
"server." "server."
) )
return False return False
def __collectFromNode(self, metricCollectors): def __collectFromNode(self, metricCollectors):
""" """
It harvests the given metrics in the metricCollectors list. This list It harvests the given metrics in the metricCollectors list. This list
should be provided by the collectables modul. It is important that should be provided by the collectables modul. It is important that
only the information collected from the node is provided here. only the information collected from the node is provided here.
""" """
metrics = [] metrics = []
for collector in metricCollectors: for collector in metricCollectors:
if (self.beat % collector[1]) is 0: if (self.beat % collector[1]) is 0:
stat = collector[0]() stat = collector[0]()
metrics.append((self.name + "." + metrics.append((self.name + "." +
stat.name + " %d" % (stat.value) stat.name + " %d" % (stat.value)
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
return metrics return metrics
def __collectFromVMs(self): def __collectFromVMs(self):
""" """
This method is used for fetching the kvm processes running on the This method is used for fetching the kvm processes running on the
node and using the cmdline parameters calculates different types of node and using the cmdline parameters calculates different types of
resource usages about the vms. resource usages about the vms.
""" """
metrics = [] metrics = []
running_vms = [] running_vms = []
procList = psutil.get_process_list() procList = psutil.get_process_list()
for entry in procList: for entry in procList:
if entry.name in "kvm": if entry.name in "kvm":
cmdLine = entry.as_dict()["cmdline"] cmdLine = entry.as_dict()["cmdline"]
search = [cmd_param_index for cmd_param_index, cmd_param in search = [cmd_param_index for cmd_param_index, cmd_param in
enumerate(cmdLine) enumerate(cmdLine)
if cmd_param == "-name"] if cmd_param == "-name"]
if not entry.is_running(): if not entry.is_running():
break break
memory = [cmd_param_index for cmd_param_index, cmd_param in memory = [cmd_param_index for cmd_param_index, cmd_param in
enumerate(cmdLine) enumerate(cmdLine)
if cmd_param == "-m"] if cmd_param == "-m"]
if not entry.is_running(): if not entry.is_running():
break break
try: try:
running_vms.append([cmdLine[search[0] + 1], running_vms.append([cmdLine[search[0] + 1],
entry.pid, entry.pid,
int(entry.as_dict()["cmdline"][ int(entry.as_dict()["cmdline"][
memory[0] + 1])]) memory[0] + 1])])
except IndexError: except IndexError:
pass pass
if ((self.beat % 30) is 0): if ((self.beat % 30) is 0):
metrics.append((self.name + "." + "vmcount" + metrics.append((self.name + "." + "vmcount" +
" %d" % len(running_vms) " %d" % len(running_vms)
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
for vm in running_vms: for vm in running_vms:
vm_proc = psutil.Process(vm[1]) vm_proc = psutil.Process(vm[1])
if ((self.beat % self.kvmCPU) is 0) and vm_proc.is_running(): if ((self.beat % self.kvmCPU) is 0) and vm_proc.is_running():
metrics.append((self.name + "." + "kvm." + metrics.append(("vm." +
vm[0] + "." + "memory.usage" + vm[0] + "." + "memory.usage" +
" %d" % ( " %d" % (
vm_proc.get_memory_percent() / 100 * vm[2]) vm_proc.get_memory_percent() / 100 * vm[2])
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
if ((self.beat % self.kvmMem) is 0) and vm_proc.is_running(): if ((self.beat % self.kvmMem) is 0) and vm_proc.is_running():
metrics.append((self.name + "." + "kvm." + metrics.append(("vm." +
vm[0] + "." + "cpu.usage" + vm[0] + "." + "cpu.usage" +
" %d" % (vm_proc.get_cpu_times().system + " %d" % (vm_proc.get_cpu_times().system +
vm_proc.get_cpu_times().user) vm_proc.get_cpu_times().user)
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
interfaces_list = psutil.network_io_counters( interfaces_list = psutil.network_io_counters(
pernic=True) pernic=True)
interfaces_list_enum = enumerate(interfaces_list) interfaces_list_enum = enumerate(interfaces_list)
if ((self.beat % self.kvmNet) is 0) and vm_proc.is_running(): if ((self.beat % self.kvmNet) is 0) and vm_proc.is_running():
for vm in running_vms: for vm in running_vms:
for iname_index, iname in interfaces_list_enum: for iname_index, iname in interfaces_list_enum:
if vm[0] in iname: if vm[0] in iname:
metrics.append((self.name + "." + "kvm." + metrics.append(("vm." +
vm[ vm[0] +
0] + "." + "network.packages_sent" + "." + "network.packages_sent" +
" %d" % interfaces_list[ " %d" % interfaces_list[
iname].packets_sent iname].packets_sent
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
metrics.append((self.name + "." + "kvm." + metrics.append(("vm." +
vm[ vm[0] +
0] + "." + "network.packages_recv" + "." + "network.packages_recv" +
" %d" % interfaces_list[ " %d" % interfaces_list[
iname].packets_recv iname].packets_recv
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
metrics.append((self.name + "." + "kvm." + metrics.append(("vm." +
vm[0] + "." + "network" vm[0] + "." + "network"
".bytes_sent" + ".bytes_sent" +
" %d" % interfaces_list[ " %d" % interfaces_list[
iname].bytes_sent iname].bytes_sent
+ " %d" % (time.time()) + " %d" % (time.time())
)) ))
metrics.append((self.name + "." + "kvm." + metrics.append(("vm." +
vm[0] + "." + "network.bytes_recv" + vm[0] +
" %d" % interfaces_list[ + "network.bytes_recv" +
iname].bytes_recv " %d" %
+ " %d" % (time.time()) interfaces_list[iname].bytes_recv
)) + " %d" % (time.time())
return metrics ))
return metrics
def getMaxFrequency(self, metricCollectors=[]): def getMaxFrequency(self, metricCollectors=[]):
""" """
""" """
items = metricCollectors + [["kvmCpuUsage", self.kvmMem], [ items = metricCollectors + [["kvmCpuUsage", self.kvmMem], [
"kvmMemoryUsage", self.kvmCPU], ["kvmNetworkUsage", self.kvmNet]] "kvmMemoryUsage", self.kvmCPU], ["kvmNetworkUsage", self.kvmNet]]
max = items[0][1] max = items[0][1]
for item in items: for item in items:
if max < item[1]: if max < item[1]:
max = item[1] max = item[1]
return max return max
def startReporting(self, metricCollectors=[]): def startReporting(self, metricCollectors=[]):
""" """
Call this method to start reporting to the server, it needs the Call this method to start reporting to the server, it needs the
metricCollectors parameter that should be provided by the collectables metricCollectors parameter that should be provided by the collectables
modul to work properly. modul to work properly.
""" """
if self.valid is False: if self.valid is False:
print("[ERROR] The client cannot be started.") print("[ERROR] The client cannot be started.")
raise RuntimeError raise RuntimeError
if self.__connect() is False: if self.__connect() is False:
print("[ERROR] An error has occured while connecting to the " print("[ERROR] An error has occured while connecting to the "
"server on %s." "server on %s."
% (self.server_address + ":" + str(self.server_port))) % (self.server_address + ":" + str(self.server_port)))
else: else:
print("[SUCCESS] Connection established to %s on port %s. \ print("[SUCCESS] Connection established to %s on port %s. \
Clientname: %s" Clientname: %s"
% (self.server_address, self.server_port, % (self.server_address, self.server_port,
self.name)) self.name))
try: try:
maxFrequency = self.getMaxFrequency(metricCollectors) maxFrequency = self.getMaxFrequency(metricCollectors)
while True: while True:
nodeMetrics = self.__collectFromNode(metricCollectors) nodeMetrics = self.__collectFromNode(metricCollectors)
vmMetrics = self.__collectFromVMs() vmMetrics = self.__collectFromVMs()
metrics = nodeMetrics + vmMetrics metrics = nodeMetrics + vmMetrics
if self.debugMode == "True": if self.debugMode == "True":
print(metrics) print(metrics)
if len(metrics) is not 0: if len(metrics) is not 0:
if self.__send(metrics) is False: if self.__send(metrics) is False:
raise RuntimeError raise RuntimeError
time.sleep(1) time.sleep(1)
self.beat = self.beat + 1 self.beat = self.beat + 1
if ((self.beat % (maxFrequency + 1)) is 0): if ((self.beat % (maxFrequency + 1)) is 0):
self.beat = 1 self.beat = 1
except KeyboardInterrupt: except KeyboardInterrupt:
print("[x] Reporting has stopped by the user. Exiting...") print("[x] Reporting has stopped by the user. Exiting...")
finally: finally:
self.__disconnect() self.__disconnect()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment