node.py 10.8 KB
Newer Older
1
from __future__ import absolute_import, unicode_literals
Őry Máté committed
2
from logging import getLogger
3
from warnings import warn
Őry Máté committed
4 5

from django.db.models import (
6
    CharField, IntegerField, ForeignKey, BooleanField, ManyToManyField,
7
    FloatField, permalink,
Őry Máté committed
8
)
9
from django.utils import timezone
Őry Máté committed
10 11 12 13 14 15
from django.utils.translation import ugettext_lazy as _

from celery.exceptions import TimeoutError
from model_utils.models import TimeStampedModel
from taggit.managers import TaggableManager

16
from common.models import method_cache, WorkerNotFound, HumanSortField
17
from common.operations import OperatedMixin
Őry Máté committed
18
from firewall.models import Host
Gregory Nagy committed
19 20
from monitor.calvin.calvin import Query
from monitor.calvin.calvin import GraphiteHandler
21 22 23 24
from ..tasks import vm_tasks
from .activity import node_activity, NodeActivity
from .common import Trait

25

Őry Máté committed
26 27 28
logger = getLogger(__name__)


29 30 31 32
def node_available(function):
    """Decorate methods to ignore disabled Nodes.
    """
    def decorate(self, *args, **kwargs):
33
        if self.enabled and self.online:
34 35 36 37 38 39
            return function(self, *args, **kwargs)
        else:
            return None
    return decorate


40
class Node(OperatedMixin, TimeStampedModel):
Őry Máté committed
41 42 43 44 45 46

    """A VM host machine, a hypervisor.
    """
    name = CharField(max_length=50, unique=True,
                     verbose_name=_('name'),
                     help_text=_('Human readable name of node.'))
47
    normalized_name = HumanSortField(monitor='name', max_length=100)
Őry Máté committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
    priority = IntegerField(verbose_name=_('priority'),
                            help_text=_('Node usage priority.'))
    host = ForeignKey(Host, verbose_name=_('host'),
                      help_text=_('Host in firewall.'))
    enabled = BooleanField(verbose_name=_('enabled'), default=False,
                           help_text=_('Indicates whether the node can '
                                       'be used for hosting.'))
    traits = ManyToManyField(Trait, blank=True,
                             help_text=_("Declared traits."),
                             verbose_name=_('traits'))
    tags = TaggableManager(blank=True, verbose_name=_("tags"))
    overcommit = FloatField(default=1.0, verbose_name=_("overcommit ratio"),
                            help_text=_("The ratio of total memory with "
                                        "to without overcommit."))

    class Meta:
        app_label = 'vm'
        db_table = 'vm_node'
        permissions = ()
67
        ordering = ('-enabled', 'normalized_name')
Őry Máté committed
68

Őry Máté committed
69 70 71
    def __unicode__(self):
        return self.name

72
    @method_cache(10)
73
    def get_online(self):
74
        """Check if the node is online.
Őry Máté committed
75

76
        Check if node is online by queue is available.
77 78
        """
        try:
79 80
            self.get_remote_queue_name("vm")
        except:
81
            return False
82 83
        else:
            return True
Őry Máté committed
84

85 86
    online = property(get_online)

87
    @node_available
Őry Máté committed
88
    @method_cache(300)
89 90
    def get_info(self):
        return self.remote_query(vm_tasks.get_info,
91 92 93
                                 default={'core_num': '',
                                          'ram_size': '0',
                                          'architecture': ''})
94

95
    info = property(get_info)
96

97 98 99 100 101 102 103 104 105 106
    @property
    def ram_size(self):
        warn('Use Node.info["ram_size"]', DeprecationWarning)
        return self.info['ram_size']

    @property
    def num_cores(self):
        warn('Use Node.info["core_num"]', DeprecationWarning)
        return self.info['core_num']

107 108 109 110 111
    STATES = {False: {False: ('OFFLINE', _('offline')),
                      True: ('DISABLED', _('disabled'))},
              True: {False: ('MISSING', _('missing')),
                     True: ('ONLINE', _('online'))}}

Őry Máté committed
112
    def get_state(self):
113
        """The state combined of online and enabled attributes.
114
        """
115 116
        return self.STATES[self.enabled][self.online][0]

Őry Máté committed
117 118
    state = property(get_state)

119 120
    def get_status_display(self):
        return self.STATES[self.enabled][self.online][1]
121

122
    def disable(self, user=None, base_activity=None):
123
        ''' Disable the node.'''
124
        if self.enabled:
125 126 127 128 129
            if base_activity:
                act_ctx = base_activity.sub_activity('disable')
            else:
                act_ctx = node_activity('disable', node=self, user=user)
            with act_ctx:
130 131
                self.enabled = False
                self.save()
132 133 134

    def enable(self, user=None):
        ''' Enable the node. '''
135 136 137 138
        if self.enabled is not True:
            with node_activity(code_suffix='enable', node=self, user=user):
                self.enabled = True
                self.save()
Bach Dániel committed
139
            self.get_info(invalidate_cache=True)
Őry Máté committed
140 141

    @property
142
    @node_available
Őry Máté committed
143 144 145 146 147
    def ram_size_with_overcommit(self):
        """Bytes of total memory including overcommit margin.
        """
        return self.ram_size * self.overcommit

148
    @method_cache(30)
Őry Máté committed
149
    def get_remote_queue_name(self, queue_id):
Dudás Ádám committed
150
        """Returns the name of the remote celery queue for this node.
151

Dudás Ádám committed
152 153
        Throws Exception if there is no worker on the queue.
        The result may include dead queues because of caching.
154
        """
155

156
        if vm_tasks.check_queue(self.host.hostname, queue_id):
157
            self.node_online()
158 159
            return self.host.hostname + "." + queue_id
        else:
160
            if self.enabled:
161
                self.node_offline()
162
            raise WorkerNotFound()
Őry Máté committed
163

164 165 166 167 168 169 170 171 172 173 174
    def node_online(self):
        """Create activity and log entry when node reappears.
        """

        try:
            act = self.activity_log.order_by('-pk')[0]
        except IndexError:
            pass  # no monitoring activity at all
        else:
            logger.debug("The last activity was %s" % act)
            if act.activity_code.endswith("offline"):
Dudás Ádám committed
175
                act = NodeActivity.create(code_suffix='monitor_success_online',
176 177 178 179 180 181
                                          node=self, user=None)
                act.started = timezone.now()
                act.finished = timezone.now()
                act.succeeded = True
                act.save()
                logger.info("Node %s is ONLINE." % self.name)
Bach Dániel committed
182
                self.get_info(invalidate_cache=True)
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207

    def node_offline(self):
        """Called when a node disappears.

        If the node is not already offline, record an activity and a log entry.
        """

        try:
            act = self.activity_log.order_by('-pk')[0]
        except IndexError:
            pass  # no activity at all
        else:
            logger.debug("The last activity was %s" % act)
            if act.activity_code.endswith("offline"):
                return
        act = NodeActivity.create(code_suffix='monitor_failed_offline',
                                  node=self, user=None)
        act.started = timezone.now()
        act.finished = timezone.now()
        act.succeeded = False
        act.save()
        logger.critical("Node %s is OFFLINE%s.", self.name,
                        ", but enabled" if self.enabled else "")
        # TODO: check if we should reschedule any VMs?

208
    def remote_query(self, task, timeout=30, raise_=False, default=None):
Őry Máté committed
209 210
        """Query the given task, and get the result.

211 212 213 214
        If the result is not ready or worker not reachable
        in timeout secs, return default value or raise a
        TimeoutError or WorkerNotFound exception.
        """
Őry Máté committed
215
        try:
216 217
            r = task.apply_async(
                queue=self.get_remote_queue_name('vm'), expires=timeout + 60)
Őry Máté committed
218
            return r.get(timeout=timeout)
219
        except (TimeoutError, WorkerNotFound):
Őry Máté committed
220 221 222 223 224
            if raise_:
                raise
            else:
                return default

225
    @node_available
226
    def get_monitor_info(self):
227 228
        try:
            handler = GraphiteHandler()
229 230 231
        except RuntimeError:
            return self.remote_query(vm_tasks.get_node_metrics, 30)

232
        query = Query()
Gregory Nagy committed
233 234 235
        query.set_target(self.host.hostname + ".circle")
        query.set_format("json")
        query.set_relative_start(5, "minutes")
236

237
        metrics = ["cpu.usage", "memory.usage"]
238
        for metric in metrics:
Gregory Nagy committed
239
            query.set_metric(metric)
240 241 242
            query.generate()
            handler.put(query)
            handler.send()
243 244

        collected = {}
245
        for metric in metrics:
Gregory Nagy committed
246
            response = handler.pop()
247 248 249 250
            try:
                cache = response[0]["datapoints"][-1][0]
            except (IndexError, KeyError):
                cache = 0
251 252 253 254 255
            if cache is None:
                cache = 0
            collected[metric] = cache
        return collected

256
    @property
257
    @node_available
258
    def cpu_usage(self):
259
        return float(self.get_monitor_info()["cpu.usage"]) / 100
260

261
    @property
262
    @node_available
263
    def ram_usage(self):
264
        return float(self.get_monitor_info()["memory.usage"]) / 100
265

266
    @property
267
    @node_available
268 269 270
    def byte_ram_usage(self):
        return self.ram_usage * self.ram_size

271
    @node_available
Őry Máté committed
272
    def update_vm_states(self):
273 274 275 276 277
        """Update state of Instances running on this Node.

        Query state of all libvirt domains, and notify Instances by their
        vm_state_changed hook.
        """
Őry Máté committed
278
        domains = {}
279 280 281 282 283
        domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5)
        if domain_list is None:
            logger.info("Monitoring failed at: %s", self.name)
            return
        for i in domain_list:
Őry Máté committed
284 285 286 287 288 289 290 291
            # [{'name': 'cloud-1234', 'state': 'RUNNING', ...}, ...]
            try:
                id = int(i['name'].split('-')[1])
            except:
                pass  # name format doesn't match
            else:
                domains[id] = i['state']

292 293
        instances = [{'id': i.id, 'state': i.state}
                     for i in self.instance_set.order_by('id').all()]
Őry Máté committed
294 295 296 297 298 299
        for i in instances:
            try:
                d = domains[i['id']]
            except KeyError:
                logger.info('Node %s update: instance %s missing from '
                            'libvirt', self, i['id'])
300 301
                # Set state to STOPPED when instance is missing
                self.instance_set.get(id=i['id']).vm_state_changed('STOPPED')
Őry Máté committed
302 303 304 305 306
            else:
                if d != i['state']:
                    logger.info('Node %s update: instance %s state changed '
                                '(libvirt: %s, db: %s)',
                                self, i['id'], d, i['state'])
307
                    self.instance_set.get(id=i['id']).vm_state_changed(d)
Őry Máté committed
308 309 310 311 312

                del domains[i['id']]
        for i in domains.keys():
            logger.info('Node %s update: domain %s in libvirt but not in db.',
                        self, i)
313 314 315

    @classmethod
    def get_state_count(cls, online, enabled):
316 317
        return len([1 for i in
                    cls.objects.filter(enabled=enabled).select_related('host')
318
                    if i.online == online])
319 320 321 322

    @permalink
    def get_absolute_url(self):
        return ('dashboard.views.node-detail', None, {'pk': self.id})