node.py 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE.  If not, see <http://www.gnu.org/licenses/>.

18
from __future__ import absolute_import, unicode_literals
19
from functools import update_wrapper
Őry Máté committed
20
from logging import getLogger
21
from warnings import warn
Bach Dániel committed
22
import requests
Őry Máté committed
23

Bach Dániel committed
24
from django.conf import settings
Őry Máté committed
25
from django.db.models import (
26
    CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField,
27
    FloatField, permalink,
Őry Máté committed
28
)
29
from django.utils import timezone
30
from django.utils.translation import ugettext_lazy as _, ugettext_noop
Őry Máté committed
31 32 33 34 35

from celery.exceptions import TimeoutError
from model_utils.models import TimeStampedModel
from taggit.managers import TaggableManager

36
from common.models import method_cache, WorkerNotFound, HumanSortField
37
from common.operations import OperatedMixin
Őry Máté committed
38
from firewall.models import Host
39 40 41 42
from ..tasks import vm_tasks
from .activity import node_activity, NodeActivity
from .common import Trait

43

Őry Máté committed
44 45 46
logger = getLogger(__name__)


47 48 49 50
def node_available(function):
    """Decorate methods to ignore disabled Nodes.
    """
    def decorate(self, *args, **kwargs):
51
        if self.enabled and self.online:
52 53 54
            return function(self, *args, **kwargs)
        else:
            return None
55 56
    update_wrapper(decorate, function)
    decorate._original = function
57 58 59
    return decorate


60
class Node(OperatedMixin, TimeStampedModel):
Őry Máté committed
61 62 63 64 65 66

    """A VM host machine, a hypervisor.
    """
    name = CharField(max_length=50, unique=True,
                     verbose_name=_('name'),
                     help_text=_('Human readable name of node.'))
67
    normalized_name = HumanSortField(monitor='name', max_length=100)
Őry Máté committed
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
    priority = IntegerField(verbose_name=_('priority'),
                            help_text=_('Node usage priority.'))
    host = ForeignKey(Host, verbose_name=_('host'),
                      help_text=_('Host in firewall.'))
    enabled = BooleanField(verbose_name=_('enabled'), default=False,
                           help_text=_('Indicates whether the node can '
                                       'be used for hosting.'))
    traits = ManyToManyField(Trait, blank=True,
                             help_text=_("Declared traits."),
                             verbose_name=_('traits'))
    tags = TaggableManager(blank=True, verbose_name=_("tags"))
    overcommit = FloatField(default=1.0, verbose_name=_("overcommit ratio"),
                            help_text=_("The ratio of total memory with "
                                        "to without overcommit."))

    class Meta:
        app_label = 'vm'
        db_table = 'vm_node'
        permissions = ()
87
        ordering = ('-enabled', 'normalized_name')
Őry Máté committed
88

Őry Máté committed
89 90 91
    def __unicode__(self):
        return self.name

92
    @method_cache(10)
93
    def get_online(self):
94
        """Check if the node is online.
Őry Máté committed
95

96
        Check if node is online by queue is available.
97 98
        """
        try:
99
            self.get_remote_queue_name("vm", "fast")
100
        except:
101
            return False
102 103
        else:
            return True
Őry Máté committed
104

105 106
    online = property(get_online)

107
    @node_available
Őry Máté committed
108
    @method_cache(300)
109 110
    def get_info(self):
        return self.remote_query(vm_tasks.get_info,
111
                                 priority='fast',
112 113 114
                                 default={'core_num': '',
                                          'ram_size': '0',
                                          'architecture': ''})
115

116
    info = property(get_info)
117

118 119 120 121 122 123 124 125 126 127
    @property
    def ram_size(self):
        warn('Use Node.info["ram_size"]', DeprecationWarning)
        return self.info['ram_size']

    @property
    def num_cores(self):
        warn('Use Node.info["core_num"]', DeprecationWarning)
        return self.info['core_num']

128 129 130 131 132
    STATES = {False: {False: ('OFFLINE', _('offline')),
                      True: ('DISABLED', _('disabled'))},
              True: {False: ('MISSING', _('missing')),
                     True: ('ONLINE', _('online'))}}

Őry Máté committed
133
    def get_state(self):
134
        """The state combined of online and enabled attributes.
135
        """
136 137
        return self.STATES[self.enabled][self.online][0]

Őry Máté committed
138 139
    state = property(get_state)

140 141
    def get_status_display(self):
        return self.STATES[self.enabled][self.online][1]
142

143
    def disable(self, user=None, base_activity=None):
144
        ''' Disable the node.'''
145
        if self.enabled:
146
            if base_activity:
147 148
                act_ctx = base_activity.sub_activity(
                    'disable', readable_name=ugettext_noop("disable node"))
149
            else:
150 151 152
                act_ctx = node_activity(
                    'disable', node=self, user=user,
                    readable_name=ugettext_noop("disable node"))
153
            with act_ctx:
154 155
                self.enabled = False
                self.save()
156

157
    def enable(self, user=None, base_activity=None):
158
        ''' Enable the node. '''
159
        if self.enabled is not True:
160 161 162 163 164
            if base_activity:
                act_ctx = base_activity.sub_activity('enable')
            else:
                act_ctx = node_activity('enable', node=self, user=user)
            with act_ctx:
165 166
                self.enabled = True
                self.save()
Bach Dániel committed
167
            self.get_info(invalidate_cache=True)
Őry Máté committed
168 169

    @property
170
    @node_available
Őry Máté committed
171 172 173 174 175
    def ram_size_with_overcommit(self):
        """Bytes of total memory including overcommit margin.
        """
        return self.ram_size * self.overcommit

176
    @method_cache(30)
177
    def get_remote_queue_name(self, queue_id, priority=None):
Dudás Ádám committed
178
        """Returns the name of the remote celery queue for this node.
179

Dudás Ádám committed
180 181
        Throws Exception if there is no worker on the queue.
        The result may include dead queues because of caching.
182
        """
183

184 185 186 187
        if vm_tasks.check_queue(self.host.hostname, queue_id, priority):
            queue_name = self.host.hostname + "." + queue_id
            if priority is not None:
                queue_name = queue_name + "." + priority
188
            self.node_online()
189
            return queue_name
190
        else:
191
            if self.enabled:
192
                self.node_offline()
193
            raise WorkerNotFound()
Őry Máté committed
194

195 196 197 198 199 200 201 202 203 204 205
    def node_online(self):
        """Create activity and log entry when node reappears.
        """

        try:
            act = self.activity_log.order_by('-pk')[0]
        except IndexError:
            pass  # no monitoring activity at all
        else:
            logger.debug("The last activity was %s" % act)
            if act.activity_code.endswith("offline"):
Dudás Ádám committed
206
                act = NodeActivity.create(code_suffix='monitor_success_online',
207 208 209 210 211 212
                                          node=self, user=None)
                act.started = timezone.now()
                act.finished = timezone.now()
                act.succeeded = True
                act.save()
                logger.info("Node %s is ONLINE." % self.name)
Bach Dániel committed
213
                self.get_info(invalidate_cache=True)
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238

    def node_offline(self):
        """Called when a node disappears.

        If the node is not already offline, record an activity and a log entry.
        """

        try:
            act = self.activity_log.order_by('-pk')[0]
        except IndexError:
            pass  # no activity at all
        else:
            logger.debug("The last activity was %s" % act)
            if act.activity_code.endswith("offline"):
                return
        act = NodeActivity.create(code_suffix='monitor_failed_offline',
                                  node=self, user=None)
        act.started = timezone.now()
        act.finished = timezone.now()
        act.succeeded = False
        act.save()
        logger.critical("Node %s is OFFLINE%s.", self.name,
                        ", but enabled" if self.enabled else "")
        # TODO: check if we should reschedule any VMs?

239 240
    def remote_query(self, task, timeout=30, priority=None, raise_=False,
                     default=None):
Őry Máté committed
241 242
        """Query the given task, and get the result.

243 244 245 246
        If the result is not ready or worker not reachable
        in timeout secs, return default value or raise a
        TimeoutError or WorkerNotFound exception.
        """
Őry Máté committed
247
        try:
248
            r = task.apply_async(
249 250
                queue=self.get_remote_queue_name('vm', priority),
                expires=timeout + 60)
Őry Máté committed
251
            return r.get(timeout=timeout)
252
        except (TimeoutError, WorkerNotFound):
Őry Máté committed
253 254 255 256 257
            if raise_:
                raise
            else:
                return default

Bach Dániel committed
258
    @property
259
    @node_available
Bach Dániel committed
260
    @method_cache(10)
Bach Dániel committed
261 262
    def monitor_info(self):
        metrics = ('cpu.usage', 'memory.usage')
263
        prefix = 'circle.%s.' % self.host.hostname
Bach Dániel committed
264 265 266 267 268
        params = [('target', '%s%s' % (prefix, metric))
                  for metric in metrics]
        params.append(('from', '-5min'))
        params.append(('format', 'json'))

269
        try:
Bach Dániel committed
270 271 272 273 274 275 276 277 278 279 280 281
            logger.info('%s %s', settings.GRAPHITE_URL, params)
            response = requests.get(settings.GRAPHITE_URL, params=params)

            retval = {}
            for target in response.json():
                # Example:
                # {"target": "circle.szianode.cpu.usage",
                #  "datapoints": [[0.6, 1403045700], [0.5, 1403045760]
                try:
                    metric = target['target']
                    if metric.startswith(prefix):
                        metric = metric[len(prefix):]
282 283
                    else:
                        continue
Bach Dániel committed
284 285 286 287 288 289 290 291
                    value = target['datapoints'][-2][0]
                    retval[metric] = float(value)
                except (KeyError, IndexError, ValueError):
                    continue

            return retval
        except:
            logger.exception('Unhandled exception: ')
292 293
            return self.remote_query(vm_tasks.get_node_metrics, timeout=30,
                                     priority="fast")
294

295
    @property
296
    @node_available
297
    def cpu_usage(self):
Bach Dániel committed
298
        return self.monitor_info.get('cpu.usage') / 100
299

300
    @property
301
    @node_available
302
    def ram_usage(self):
Bach Dániel committed
303
        return self.monitor_info.get('memory.usage') / 100
304

305
    @property
306
    @node_available
307 308 309
    def byte_ram_usage(self):
        return self.ram_usage * self.ram_size

310 311
    def get_status_icon(self):
        return {
312
            'OFFLINE': 'fa-minus-circle',
Kálmán Viktor committed
313
            'DISABLED': 'fa-moon-o',
314 315 316
            'MISSING': 'fa-warning',
            'ONLINE': 'fa-play-circle'}.get(self.get_state(),
                                            'fa-question-circle')
317

318 319
    def get_status_label(self):
        return {
320 321 322
            'OFFLINE': 'label-warning',
            'DISABLED': 'label-warning',
            'MISSING': 'label-danger',
323 324 325
            'ONLINE': 'label-success'}.get(self.get_state(),
                                           'label-danger')

326
    @node_available
Őry Máté committed
327
    def update_vm_states(self):
328 329 330 331 332
        """Update state of Instances running on this Node.

        Query state of all libvirt domains, and notify Instances by their
        vm_state_changed hook.
        """
Őry Máté committed
333
        domains = {}
334 335
        domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5,
                                        priority="fast")
336 337 338 339
        if domain_list is None:
            logger.info("Monitoring failed at: %s", self.name)
            return
        for i in domain_list:
Őry Máté committed
340 341 342 343 344 345 346 347
            # [{'name': 'cloud-1234', 'state': 'RUNNING', ...}, ...]
            try:
                id = int(i['name'].split('-')[1])
            except:
                pass  # name format doesn't match
            else:
                domains[id] = i['state']

348 349
        instances = [{'id': i.id, 'state': i.state}
                     for i in self.instance_set.order_by('id').all()]
Őry Máté committed
350 351 352 353 354 355
        for i in instances:
            try:
                d = domains[i['id']]
            except KeyError:
                logger.info('Node %s update: instance %s missing from '
                            'libvirt', self, i['id'])
356
                # Set state to STOPPED when instance is missing
357 358
                self.instance_set.get(id=i['id']).vm_state_changed(
                    'STOPPED', None)
Őry Máté committed
359 360 361 362 363
            else:
                if d != i['state']:
                    logger.info('Node %s update: instance %s state changed '
                                '(libvirt: %s, db: %s)',
                                self, i['id'], d, i['state'])
364
                    self.instance_set.get(id=i['id']).vm_state_changed(d)
Őry Máté committed
365 366

                del domains[i['id']]
Őry Máté committed
367
        for id, state in domains.iteritems():
368
            from .instance import Instance
Őry Máté committed
369 370 371
            logger.error('Node %s update: domain %s in libvirt but not in db.',
                         self, id)
            Instance.objects.get(id=id).vm_state_changed(state, self)
372 373 374

    @classmethod
    def get_state_count(cls, online, enabled):
375 376
        return len([1 for i in
                    cls.objects.filter(enabled=enabled).select_related('host')
377
                    if i.online == online])
378 379 380 381

    @permalink
    def get_absolute_url(self):
        return ('dashboard.views.node-detail', None, {'pk': self.id})