client.py 8.69 KB
Newer Older
1 2
#!/usr/bin/python

3
from itertools import islice
4 5
from socket import gethostname
import argparse
6
import logging
7
import os
8 9 10
import pika
import psutil
import time
11

12
logger = logging.getLogger(__name__)
13

14

15
class Client:
16

Gregory Nagy committed
17
    env_config = {
18 19
        "server_address": "GRAPHITE_HOST",
        "server_port": "GRAPHITE_PORT",
Gregory Nagy committed
20 21 22 23 24 25
        "amqp_user": "GRAPHITE_AMQP_USER",
        "amqp_pass": "GRAPHITE_AMQP_PASSWORD",
        "amqp_queue": "GRAPHITE_AMQP_QUEUE",
        "amqp_vhost": "GRAPHITE_AMQP_VHOST",
    }

Bach Dániel committed
26
    def __init__(self):
27 28 29 30 31 32 33 34 35 36 37 38 39
        """
        Constructor of the client class that is responsible for handling the
        communication between the graphite server and the data source. In
        order to initialize a client you must have the following
        environmental varriables:
        - GRAPHITE_SERVER_ADDRESS:
        - GRAPHITE_SERVER_PORT:
        - GRAPHITE_AMQP_USER:
        - GRAPHITE_AMQP_PASSWORD:
        - GRAPHITE_AMQP_QUEUE:
        - GRAPHITE_AMQP_VHOST:
        Missing only one of these variables will cause the client not to work.
        """
40 41 42 43 44 45 46
        self.name = 'circle.%s' % gethostname()
        for var, env_var in self.env_config.items():
            value = os.getenv(env_var, "")
            if value:
                setattr(self, var, value)
            else:
                raise RuntimeError('%s environment variable missing' % env_var)
Gregory Nagy committed
47

Gregory Nagy committed
48
    def connect(self):
49 50 51 52 53 54 55 56
        """
        This method creates the connection to the queue of the graphite
        server using the environmental variables given in the constructor.
        Returns true if the connection was successful.
        """
        try:
            credentials = pika.PlainCredentials(self.amqp_user, self.amqp_pass)
            params = pika.ConnectionParameters(host=self.server_address,
57
                                               port=int(self.server_port),
58 59 60 61
                                               virtual_host=self.amqp_vhost,
                                               credentials=credentials)
            self.connection = pika.BlockingConnection(params)
            self.channel = self.connection.channel()
62
            logger.info('Connection established to %s.', self.server_address)
63
        except RuntimeError:
64 65 66 67 68 69 70 71
            logger.error('Cannot connect to the server. '
                         'Parameters may be wrong.')
            logger.error("An error has occured while connecting to the server")
            raise
        except:  # FIXME
            logger.error('Cannot connect to the server. There is no one '
                         'listening on the other side.')
            raise
72

Gregory Nagy committed
73
    def disconnect(self):
74 75 76 77 78 79 80
        """
        Break up the connection to the graphite server. If something went
        wrong while disconnecting it simply cut the connection up.
        """
        try:
            self.channel.close()
            self.connection.close()
81
        except RuntimeError as e:
Bach Dániel committed
82 83
            logger.error('An error has occured while disconnecting. %s',
                         unicode(e))
84
            raise
85

Gregory Nagy committed
86
    def send(self, message):
87 88 89 90 91 92
        """
        Send the message given in the parameters given in the message
        parameter. This function expects that the graphite server want the
        metric name given in the message body. (This option must be enabled
        on the server. Otherwise it can't parse the data sent.)
        """
93
        body = "\n".join(message)
94 95
        try:
            self.channel.basic_publish(exchange=self.amqp_queue,
96
                                       routing_key='', body=body)
97
        except:
98 99 100
            logger.error('An error has occured while sending metrics (%dB).',
                         len(body))
            raise
101

Bach Dániel committed
102
    def collect_node(self):
103 104 105 106 107
        """
        It harvests the given metrics in the metricCollectors list. This list
        should be provided by the collectables modul. It is important that
        only the information collected from the node is provided here.
        """
Bach Dániel committed
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134

        now = time.time()
        metrics = {
            'cpu.usage': psutil.cpu_percent(interval=0.0),
            'cpu.times': psutil.cpu_times().user + psutil.cpu_times().system,
            'memory.usage': psutil.virtual_memory().percent,
            'swap.usage': psutil.swap_memory().percent,
            'user.count': len(psutil.get_users()),
            'system.boot_time': psutil.get_boot_time()
        }

        for k, v in psutil.disk_io_counters().__dict__.items():
            metrics['disk.%s' % k] = v

        interfaces = psutil.network_io_counters(pernic=True)
        for interface, data in interfaces.iteritems():
            if not interface.startswith('cloud-'):
                for metric in ('packets_sent', 'packets_recv',
                               'bytes_sent', 'bytes_recv'):
                    metrics['network.%s-%s' %
                            (metric, interface)] = getattr(data, metric)

        return ['%(host)s.%(name)s %(val)f %(time)d' % {'host': self.name,
                                                        'name': name,
                                                        'val': value,
                                                        'time': now}
                for name, value in metrics.iteritems()]
135

Gregory Nagy committed
136
    def collect_vms(self):
137 138 139 140 141 142
        """
        This method is used for fetching the kvm processes running on the
        node and using the cmdline parameters calculates different types of
        resource usages about the vms.
        """
        metrics = []
143
        now = time.time()
144
        running_vms = []
145

Bach Dániel committed
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
        for entry in psutil.get_process_list():
            try:
                if entry.name == 'kvm':
                    parser = argparse.ArgumentParser()
                    parser.add_argument('-name')
                    parser.add_argument('--memory-size', '-m ', type=int)
                    args, unknown = parser.parse_known_args(
                        entry.cmdline[1:])

                    process = psutil.Process(entry.pid)

                    mem_perc = (process.get_memory_percent()
                                / 100 * args.memory_size)
                    metrics.append('vm.%(name)s.memory.usage %(value)f '
                                   '%(time)d' % {'name': args.name,
                                                 'value': mem_perc,
                                                 'time': now})
                    user_time, system_time = process.get_cpu_times()
                    sum_time = system_time + user_time
                    metrics.append('vm.%(name)s.cpu.usage %(value)f '
                                   '%(time)d' % {'name': args.name,
                                                 'value': sum_time,
                                                 'time': now})
                    running_vms.append(args.name)
            except psutil.NoSuchProcess:
                logger.warning('Process %d lost.', entry.pid)

        interfaces = psutil.network_io_counters(pernic=True)
        for interface, data in interfaces.iteritems():
            try:
                vm, vlan = interface.rsplit('-', 1)
            except ValueError:
                continue
            if vm in running_vms:
                for metric in ('packets_sent', 'packets_recv',
                               'bytes_sent', 'bytes_recv'):
                    metrics.append(
                        'vm.%(name)s.network.%(metric)s-'
                        '%(interface)s %(data)f %(time)d' %
                        {'name': vm,
                            'interface': vlan,
                            'metric': metric,
                            'time': now,
                            'data': getattr(data, metric)})

        metrics.append(
            '%(host)s.vmcount %(data)d %(time)d' % {
                'host': self.name,
                'data': len(running_vms),
                'time': time.time()})
196

Bach Dániel committed
197
        return metrics
198

199 200 201 202 203 204 205
    @staticmethod
    def _chunker(seq, size):
        """Yield seq in size-long chunks.
        """
        for pos in xrange(0, len(seq), size):
            yield islice(seq, pos, pos + size)

Bach Dániel committed
206
    def run(self):
207 208 209 210 211
        """
        Call this method to start reporting to the server, it needs the
        metricCollectors parameter that should be provided by the collectables
        modul to work properly.
        """
212
        self.connect()
213 214
        try:
            while True:
Bach Dániel committed
215
                metrics = self.collect_node() + self.collect_vms()
216
                if metrics:
217 218
                    for chunk in self._chunker(metrics, 100):
                        self.send(chunk)
219
                    logger.info("%d metrics sent", len(metrics))
Bach Dániel committed
220
                time.sleep(10)
221
        except KeyboardInterrupt:
222
            logger.info("Reporting has stopped by the user. Exiting...")
223
        finally:
Gregory Nagy committed
224
            self.disconnect()