Commit f1cf131d by Szabolcs Gelencsér Committed by Gelencsér Szabolcs

Implement diagnostics collection from Azure VMs

parent ff377c98
*.pyc
*.swp
.ropeproject/*
__all__ = ["client", "collectables", "metrics", "cnfparse"]
__all__ = ["azureperformancecounters","client", "collectables", "metrics", "cnfparse"]
import logging
import requests
import azure.mgmt.storage
import re
from azure.storage.table import TableService, Entity
logger = logging.getLogger(__name__)
class AzureVmPerformanceCounters(object):
"""
Get performance counters for Azure VMs
For Linux:
- Processor usage
- Memory usage
- Bytes recieved, sent
- Packets recieved, sent
For Windows:
- Processor usage
- Memory usage
"""
# performance counters for linux
__linux_performance_counters = {
"\\Processor\\PercentProcessorTime": "processor_usage",
"\\Memory\\PercentUsedMemory": "memory_usage",
"\\NetworkInterface\\BytesReceived": "bytes_recv",
"\\NetworkInterface\\BytesTransmitted": "bytes_sent",
"\\NetworkInterface\\PacketsReceived": "packets_recv",
"\\NetworkInterface\\PacketsTransmitted": "packets_sent",
}
# performance counters for windows
__windows_performance_counters = {
"\\Processor(_Total)\\% Processor Time": "processor_usage",
"\\Memory\\% Committed Bytes In Use": "memory_usage",
}
def __get_storage_table_infos(self):
"""
Get information about the storage table of the VM through
Insights API
Relevant information is: storage account name, storage table names
"""
# filter string for performance counters
filter_string = None
for pc in self.__performance_counters.keys():
if filter_string is None:
filter_string = "&$filter=name.value eq '%s'" % (pc)
else:
filter_string += " or name.value eq '%s'" % (pc)
# rest api url to query
list_metrics_url = "https://management.azure.com/subscriptions/" \
"%s/resourceGroups/%s/providers/" \
"Microsoft.Compute/virtualMachines/%s/metricDefinitions" \
"?api-version=2014-04-01%s" % (
self.subscription_id, self.resource_group,
self.vm.name, filter_string,
)
headers = {"Authorization": 'Bearer ' + self.accessToken}
json_output = requests.get(list_metrics_url, headers=headers).json()
if json_output.has_key("value"):
return json_output["value"]
else:
return None
def __get_access_key_for_storage_account(self):
"""
Get acccess key for the storage account related to our VM
"""
storage_client = azure.mgmt.storage.StorageManagementClient(
self.credentials,
self.subscription_id,
)
keys = storage_client.storage_accounts.list_keys(
self.resource_group, self.account_name
)
return keys.keys[0].value
def __get_performance_counter_values(self):
"""
Get all the performance counters we are interested in,
in one OData request
"""
table_service = TableService(
account_name=self.account_name,
account_key=self.account_key
)
filter_string = "PartitionKey eq '%s' and ( " % (
self.partition_key
)
first_pc_id = True
for pc_id in self.counter_ids:
if first_pc_id:
first_pc_id = False
else:
filter_string += " or "
filter_string += "CounterName eq '%s'" % (pc_id)
filter_string += " )"
pcs = table_service.query_entities(
self.table_name,
filter=filter_string,
)
most_recent_pcs = []
most_recent_pcs_names = []
for pc in pcs:
if pc["CounterName"] in most_recent_pcs_names:
break
most_recent_pcs_names.append(pc["CounterName"])
most_recent_pcs.append(pc)
return most_recent_pcs
def __init__(self, credentials, accessToken, subscription_id,
resource_group, vm):
"""
Get relevant performance for Azure Virtual Machine
Relevant performance counters are:
- Processor usage,
- Memory usage,
- Bytes, and packets recieved and transmitted over the network
interfaces
Parameters:
:credentials: <azure.common.credentials> object for SDK calls
:accessToken: will be passed to 'Authorization'
in REST API calls
:subscription_id: subscription id
:resource_group: vm's resource group's name
:vm: <azure.mgmt.compute.models.VirtualMachine> object
"""
self.credentials = credentials
self.accessToken = accessToken
self.subscription_id = subscription_id
self.resource_group = resource_group
self.vm = vm
self.account_key = None
self.counter_ids = []
self.performance_counters = {}
# set performance counter dictionary based on OS
if vm.os_profile.linux_configuration is not None:
self.__performance_counters = \
AzureVmPerformanceCounters.__linux_performance_counters
self.__aggregation_type = "Last"
else:
self.__performance_counters = \
AzureVmPerformanceCounters.__windows_performance_counters
self.__aggregation_type = "Average"
# get storage table infos
logger.info("get storage table infos vm '%s'" % (vm.name))
storage_table_infos = self.__get_storage_table_infos()
# summarized performance counters by account_name
if storage_table_infos is not None:
for table_info in storage_table_infos:
table_minute_grain = (
t_grained for t_grained
in table_info["metricAvailabilities"]
if t_grained["timeGrain"] == "PT1M"
).next()
table_endpoint = \
table_minute_grain["location"]["tableEndpoint"]
# get account name for table endpoint string
self.account_name = re.match(
r'https?://(.*).table.core.windows.net/',
table_endpoint,
).groups(0)[0]
# get account key if we hadn't got it before
if self.account_key is None:
logger.info("get access key for account '%s'" % (
self.account_name
))
self.account_key = \
self.__get_access_key_for_storage_account()
# get most recent table
latest_table = None
for t in table_minute_grain["location"]["tableInfo"]:
if latest_table == None \
or latest_table["startTime"] < t["startTime"]:
latest_table = t
# get table name
self.table_name = latest_table["tableName"]
# get partition key
self.partition_key = \
table_minute_grain["location"]["partitionKey"]
# get counter id
counter_id = \
table_info["name"]["value"].replace("\\\\", "\\")
self.counter_ids.append(counter_id)
logger.info("get value of performance counters")
most_recent_pcs = self.__get_performance_counter_values()
# store counters
for pc in most_recent_pcs:
counter_name = \
self.__performance_counters[pc["CounterName"]]
self.performance_counters[counter_name] = {
"value": pc[self.__aggregation_type],
"timestamp": pc["Timestamp"],
}
......@@ -8,6 +8,10 @@ import os
import pika
import psutil
import time
import adal
from azureperformancecounters import AzureVmPerformanceCounters
from azure.common.credentials import ServicePrincipalCredentials
import azure.mgmt.compute
logger = logging.getLogger(__name__)
......@@ -21,6 +25,13 @@ class Client:
"amqp_pass": "GRAPHITE_AMQP_PASSWORD",
"amqp_queue": "GRAPHITE_AMQP_QUEUE",
"amqp_vhost": "GRAPHITE_AMQP_VHOST",
"subscription_id": "AZURE_SUBSCRIPTION_ID",
"resource_group": "AZURE_RESOURCE_GROUP",
"client_id": "AZURE_CLIENT_ID",
"client_oauth_id": "AZURE_CLIENT_OAUTH_ID",
"client_secret": "AZURE_CLIENT_SECRET",
"client_url": "AZURE_CLIENT_URL",
"tenant_id": "AZURE_TENANT_ID",
}
def __init__(self):
......@@ -35,6 +46,11 @@ class Client:
- GRAPHITE_AMQP_PASSWORD:
- GRAPHITE_AMQP_QUEUE:
- GRAPHITE_AMQP_VHOST:
- AZURE_SUBSCRIPTION_ID:
- AZURE_CLIENT_ID:
- AZURE_CLIENT_SECRET:
- AZURE_CLIENT_URL:
- AZURE_TENANT_ID:
Missing only one of these variables will cause the client not to work.
"""
self.name = 'circle.%s' % gethostname()
......@@ -45,6 +61,30 @@ class Client:
else:
raise RuntimeError('%s environment variable missing' % env_var)
self.credentials = ServicePrincipalCredentials(
client_id=self.client_id,
secret=self.client_secret,
tenant=self.tenant_id,
)
self.compute_client = azure.mgmt.compute.ComputeManagementClient(
self.credentials,
self.subscription_id,
)
def refresh_rest_api_token(self):
authentication_context = adal.AuthenticationContext(
'https://login.microsoftonline.com/%s' % (self.client_oauth_id),
)
access_token = \
authentication_context.acquire_token_with_client_credentials(
'https://management.azure.com/',
self.client_url, self.client_secret,
)
self.access_token = access_token["accessToken"]
def connect(self):
"""
This method creates the connection to the queue of the graphite
......@@ -99,118 +139,111 @@ class Client:
len(body))
raise
def collect_node(self):
def _collect_running_vms_for_group(self, group_name):
"""
It harvests the given metrics in the metricCollectors list. This list
should be provided by the collectables modul. It is important that
only the information collected from the node is provided here.
Collect running vms for resource group
:group_name: resource group name to collect running vms for
"""
logger.info("getting vm list for resurce group '%s'" % (group_name))
now = time.time()
vmem = psutil.virtual_memory()
metrics = {
'cpu.percent': psutil.cpu_percent(),
'cpu.times': psutil.cpu_times().user + psutil.cpu_times().system,
'memory.usage': vmem.percent,
'memory.used_bytes': (vmem.total - vmem.available),
'swap.usage': psutil.swap_memory().percent,
'user.count': len(psutil.users()),
'system.boot_time': psutil.boot_time()
}
virtual_machine_names = self.compute_client.virtual_machines.list(
group_name)
try:
for k, v in psutil.disk_io_counters().__dict__.items():
metrics['disk.%s' % k] = v
except:
pass
interfaces = psutil.network_io_counters(pernic=True)
for interface, data in interfaces.iteritems():
if not (interface.startswith('cloud') or
interface.endswith('-EXT') or
interface.startswith('net') or
interface.startswith('link') or
interface in ('lo', 'firewall', 'virbr0', 'ovs-system')):
for metric in ('packets_sent', 'packets_recv',
'bytes_sent', 'bytes_recv'):
metrics['network.%s-%s' %
(metric, interface)] = getattr(data, metric)
return ['%(host)s.%(name)s %(val)f %(time)d' % {'host': self.name,
'name': name,
'val': value,
'time': now}
for name, value in metrics.iteritems()]
running_virtual_machines = []
for vm_name in virtual_machine_names:
logger.info("get state of vm '%s'" % (vm_name.name))
# get vm by resource group and name
vm = self.compute_client.virtual_machines.get(
group_name, vm_name.name, expand="instanceView"
)
# get vm power state
vm_power_state = (
status for status in vm.instance_view.statuses
if status.code.startswith("PowerState")
).next().code
if(vm_power_state == "PowerState/running"):
running_virtual_machines.append(vm)
return running_virtual_machines
def datetime_to_time(self, dt):
"""
Convert datetime to time
:dt: datetime object to convert to time
"""
return time.mktime(dt.timetuple()) + dt.microsecond / 1E6
def collect_vms(self):
"""
This method is used for fetching the kvm processes running on the
node and using the cmdline parameters calculates different types of
This method is used for fetching vm's information running in
Azure and using the cmdline parameters calculates different types of
resource usages about the vms.
"""
metrics = []
now = time.time()
running_vms = []
for entry in psutil.get_process_list():
try:
if entry.name() in ('kvm', 'qemu-system-x86_64'):
parser = argparse.ArgumentParser()
parser.add_argument('-name')
parser.add_argument('--memory-size', '-m ', type=int)
args, unknown = parser.parse_known_args(
entry.cmdline()[1:])
process = self.processes.get(entry.pid, None)
if not process or process.cmdline() != entry.cmdline():
process = psutil.Process(entry.pid)
logger.info('New process: %s', process)
self.processes[entry.pid] = process
mem_perc = (float(process.get_memory_info().rss)
/ (args.memory_size * 1024 ** 2) * 90)
metrics.append('vm.%(name)s.memory.usage %(value)f '
'%(time)d' % {'name': args.name,
'value': mem_perc,
'time': now})
cpu_perc = process.get_cpu_percent()
metrics.append('vm.%(name)s.cpu.percent %(value)f '
'%(time)d' % {'name': args.name,
'value': cpu_perc,
'time': now})
running_vms.append(args.name)
except psutil.NoSuchProcess:
logger.warning('Process %d lost.', entry.pid)
interfaces = psutil.network_io_counters(pernic=True)
for interface, data in interfaces.iteritems():
try:
vm, vlan = interface.rsplit('-', 1)
except ValueError:
continue
if vm in running_vms:
for metric in ('packets_sent', 'packets_recv',
'bytes_sent', 'bytes_recv'):
metrics.append(
'vm.%(name)s.network.%(metric)s-'
'%(interface)s %(data)f %(time)d' %
{'name': vm,
'interface': vlan,
'metric': metric,
'time': now,
'data': getattr(data, metric)})
running_vms = self._collect_running_vms_for_group(self.resource_group)
for vm in running_vms:
vm_counters = AzureVmPerformanceCounters(
self.credentials, self.access_token,
self.subscription_id, self.resource_group, vm
).performance_counters
if vm_counters.has_key("memory_usage"):
metrics.append(
'vm.%(name)s.memory.usage %(value)f %(time)d' % {
'name': vm.name,
'value': vm_counters["memory_usage"]["value"],
'time': self.datetime_to_time(
vm_counters["memory_usage"]["timestamp"],
),
}
)
if vm_counters.has_key("processor_usage"):
metrics.append(
'vm.%(name)s.cpu.percent %(value)f %(time)d' % {
'name': vm.name,
'value': vm_counters["processor_usage"]["value"],
'time': self.datetime_to_time(
vm_counters["processor_usage"]["timestamp"],
),
}
)
for unit in ["bytes", "packets"]:
for direction in ["recv", "sent"]:
pc_name = "%s_%s" % (unit, direction)
if vm_counters.has_key(pc_name):
metrics.append(
'vm.%(name)s.network.%(metric)s-'
'%(interface)s %(data)f %(time)d' % {
'name': vm.name,
'interface': "eth0",
'metric': pc_name,
'time': self.datetime_to_time(
vm_counters[pc_name]["timestamp"],
),
'data': vm_counters[pc_name]["value"],
}
)
metrics.append(
'%(host)s.vmcount %(data)d %(time)d' % {
'host': self.name,
'host': "azure",
'data': len(running_vms),
'time': time.time()})
'time': time.time(),
}
)
return metrics
@staticmethod
def _chunker(seq, size):
"""Yield seq in size-long chunks.
"""
Yield seq in size-long chunks.
"""
for pos in xrange(0, len(seq), size):
yield islice(seq, pos, pos + size)
......@@ -225,7 +258,8 @@ class Client:
self.processes = {}
try:
while True:
metrics = self.collect_node() + self.collect_vms()
self.refresh_rest_api_token()
metrics = self.collect_vms()
if metrics:
for chunk in self._chunker(metrics, 100):
self.send(chunk)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment