client.py 7.18 KB
Newer Older
1 2
#!/usr/bin/python

3 4 5
import time
import socket
import pika
6
import psutil
7
import logging
8 9 10
import os


11
logging.basicConfig()
12

13

14
class Client:
15 16
	def __init__(self, config):
		"""
17 18 19 20 21 22 23 24 25 26 27
		Constructor of the client class that is responsible for handling the
		communication between the graphite server and the data source. In
		order to initialize a client you must have the following
		environmental varriables:
		- GRAPHITE_SERVER_ADDRESS:
		- GRAPHITE_SERVER_PORT:
		- GRAPHITE_AMQP_USER:
		- GRAPHITE_AMQP_PASSWORD:
		- GRAPHITE_AMQP_QUEUE:
		- GRAPHITE_AMQP_VHOST:
		Missing only one of these variables will cause the client not to work.
28 29 30 31
		"""
		hostname = socket.gethostname().split('.')
		hostname.reverse()
		self.name = "circle." + ".".join(hostname)
32
		if os.getenv("GRAPHITE_SERVER_ADDRESS") is "":
33 34 35
			print("GRAPHITE_SERVER_ADDRESS cannot be found in environmental "
			      "variables"
			)
36 37
			return
		if os.getenv("GRAPHITE_SERVER_PORT") is "":
38 39 40
			print("GRAPHITE_SERVER_PORT cannot be found in environmental "
			      "variables. (AMQP standard is: 5672"
			)
41 42 43
			return
		if os.getenv("GRAPHITE_AMQP_USER") is "" or os.getenv(
				"GRAPHITE_AMQP_PASSWORD") is "":
44 45 46 47
			print("GRAPHITE_AMQP_USER or GRAPHITE_AMQP_PASSWORD cannot be "
			      "found in environmental variables. (AMQP standard is: "
			      "guest-guest)"
			)
48 49 50
			return
		if os.getenv("GRAPHITE_AMQP_QUEUE") is "" or os.getenv(
				"GRAPHITE_AMQP_VHOST") is "":
51 52 53
			print("GRAPHITE_AMQP_QUEUE or GRAPHITE_AMQP_VHOST cannot be "
			      "found in environmental variables."
			)
54
			return
55 56 57 58 59 60
		self.server_address = str(os.getenv("GRAPHITE_SERVER_ADDRESS"))
		self.server_port = int(os.getenv("GRAPHITE_SERVER_PORT"))
		self.amqp_user = str(os.getenv("GRAPHITE_AMQP_USER"))
		self.amqp_pass = str(os.getenv("GRAPHITE_AMQP_PASSWORD"))
		self.amqp_queue = str(os.getenv("GRAPHITE_AMQP_QUEUE"))
		self.amqp_vhost = str(os.getenv("GRAPHITE_AMQP_VHOST"))
61 62 63
		self.debugMode = config["debugMode"]
		self.kvmCPU = int(config["kvmCpuUsage"])
		self.kvmMem = int(config["kvmMemoryUsage"])
64 65
		self.beat = 1

66

67 68
	def __connect(self):
		"""
69 70
		This method creates the connection to the queue of the graphite
		server using the environmental variables given in the constructor.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
		Returns true if the connection was successful.
		"""
		try:
			credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
			params = pika.ConnectionParameters(host=self.server_address,
			                                   port=self.server_port,
			                                   virtual_host=self.amqp_vhost,
			                                   credentials=credentials)
			self.connection = pika.BlockingConnection(params)
			self.channel = self.connection.channel()
			return True
		except RuntimeError:
			print ("[ERROR] Cannot connect to the server. "
			       "Parameters could be wrong."
			)
86 87 88 89 90 91
			return False
		except:
			print ("[ERROR] Cannot connect to the server. There is no one "
			       "listening on the other side."
			)
			return False
92

93 94
	def __disconnect(self):
		"""
95 96
		Break up the connection to the graphite server. If something went
		wrong while disconnecting it simply cut the connection up.
97 98 99 100 101 102 103 104
		"""
		try:
			self.channel.close()
			self.connection.close()
		except RuntimeError:
			print("[ERROR] An error has occured while disconnecting from the "
			      "server."
			)
105

106 107
	def __send(self, message):
		"""
108 109 110 111
		Send the message given in the parameters given in the message
		parameter. This function expects that the graphite server want the
		metric name given in the message body. (This option must be enabled
		on the server. Otherwise it can't parse the data sent.)
112 113 114
		"""
		try:
			self.channel.basic_publish(exchange=self.amqp_queue,
115
			                           routing_key='', body="\n".join(message))
116 117 118 119 120 121
			return True
		except:
			print("[ERROR] An error has occured while sending metrics to the "
			      "server."
			)
			return False
122

123 124 125
	def __collectFromNode(self, metricCollectors):
		"""
		It harvests the given metrics in the metricCollectors list. This list
126 127
		should be provided by the collectables modul. It is important that
		only the information collected from the node is provided here.
128 129 130 131 132 133 134 135 136 137
		"""
		metrics = []
		for collector in metricCollectors:
			if (self.beat % collector[1]) is 0:
				stat = collector[0]()
				metrics.append((self.name + "." +
				                stat.name + " %d" % (stat.value)
				                + " %d" % (time.time())
				))
		return metrics
138

139
	def __collectFromVMs(self):
140 141 142 143 144
		"""
		This method is used for fetching the kvm processes running on the
		node and using the cmdline parameters calculates different types of
		resource usages about the vms.
		"""
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
		metrics = []
		running_vms = []
		for entry in psutil.get_process_list():
			if entry.name in "kvm":
				search = [cmd_param_index for cmd_param_index, cmd_param in
				          enumerate(entry.as_dict()["cmdline"])
				          if cmd_param == "-name"]
				memory = [cmd_param_index for cmd_param_index, cmd_param in
				          enumerate(entry.as_dict()["cmdline"])
				          if cmd_param == "-m"]
				running_vms.append([entry.as_dict()["cmdline"][search[0] + 1],
				                    entry.pid,
				                    int(entry.as_dict()["cmdline"][
					                    memory[0] + 1])])
		for vm in running_vms:
			vm_proc = psutil.Process(vm[1])
161 162 163 164 165 166 167 168 169 170 171 172 173 174
			if (self.beat % self.kvmCPU) is 0:
				metrics.append((self.name + "." + "kvm." +
				                vm[0] + "." + "memory.usage" +
				                " %d" % (
				                vm_proc.get_memory_percent() / 100 * vm[2])
				                + " %d" % (time.time())
				))
			if (self.beat % self.kvmMem) is 0:
				metrics.append((self.name + "." + "kvm." +
				                vm[0] + "." + "cpu.usage" +
				                " %d" % (vm_proc.get_cpu_times().system +
				                         vm_proc.get_cpu_times().user)
				                + " %d" % (time.time())
				))
175
		return metrics
176

177
	def getMaxFrequency(self, metricCollectors=[]):
178 179 180 181 182 183
		"""
		"""
		items = metricCollectors + [["kvmCpuUsage", self.kvmMem], [
			"kvmMemoryUsage", self.kvmCPU]]
		max = items[0][1]
		for item in items:
184 185 186 187
			if max < item[1]:
				max = item[1]
		return max

188
	def startReporting(self, metricCollectors=[]):
189 190 191 192 193 194 195 196 197 198 199
		"""
		Call this method to start reporting to the server, it needs the
		metricCollectors parameter that should be provided by the collectables
		modul to work properly.
		"""
		if self.__connect() is False:
			print("[ERROR] An error has occured while connecting to the "
			      "server on %s."
			      % (self.server_address + ":" + str(self.server_port)))
		else:
			print("[SUCCESS] Connection established to %s on port %s. \
200
                  Clientname: %s"
201 202 203 204 205
			      % (self.server_address, self.server_port,
			         self.name))
		try:
			maxFrequency = self.getMaxFrequency(metricCollectors)
			while True:
206
				nodeMetrics = self.__collectFromNode(metricCollectors)
207
				vmMetrics = self.__collectFromVMs()
208 209
				metrics = nodeMetrics + vmMetrics
				if self.debugMode == "True":
210
					print(metrics)
211 212 213 214 215 216 217 218 219 220
				if self.__send(metrics) is False:
					raise RuntimeError
				time.sleep(1)
				self.beat = self.beat + 1
				if ((self.beat % (maxFrequency + 1)) is 0):
					self.beat = 1
		except KeyboardInterrupt:
			print("[x] Reporting has stopped by the user. Exiting...")
		finally:
			self.__disconnect()