Commit 8b0beaf6 by Guba Sándor Committed by Őry Máté

vm: handle node workers disappearing

Fix all celery calls to use remote_query function.
Fix remote_queue_name to check for node online state.
Add node_online() and node_offline() to track availability of node.
parent 734415ce
...@@ -16,10 +16,11 @@ from firewall.models import Host ...@@ -16,10 +16,11 @@ from firewall.models import Host
from ..tasks import vm_tasks from ..tasks import vm_tasks
from .common import Trait from .common import Trait
from .activity import node_activity from .activity import node_activity, NodeActivity
from monitor.calvin.calvin import Query from monitor.calvin.calvin import Query
from monitor.calvin.calvin import GraphiteHandler from monitor.calvin.calvin import GraphiteHandler
from django.utils import timezone
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -56,8 +57,14 @@ class Node(TimeStampedModel): ...@@ -56,8 +57,14 @@ class Node(TimeStampedModel):
@method_cache(10, 5) @method_cache(10, 5)
def get_online(self): def get_online(self):
"""Check if the node is online.
return self.remote_query(vm_tasks.ping, timeout=1, default=False) Runs a remote ping task if the worker is running.
"""
try:
return self.remote_query(vm_tasks.ping, timeout=1, default=False)
except WorkerNotFound:
return False
online = property(get_online) online = property(get_online)
...@@ -66,15 +73,14 @@ class Node(TimeStampedModel): ...@@ -66,15 +73,14 @@ class Node(TimeStampedModel):
"""Number of CPU threads available to the virtual machines. """Number of CPU threads available to the virtual machines.
""" """
return self.remote_query(vm_tasks.get_core_num) return self.remote_query(vm_tasks.get_core_num, default=0)
num_cores = property(get_num_cores) num_cores = property(get_num_cores)
@property @property
def state(self): def state(self):
"""Node state. """The state combined of online and enabled attributes.
""" """
if self.enabled and self.online: if self.enabled and self.online:
return 'ONLINE' return 'ONLINE'
elif self.enabled and not self.online: elif self.enabled and not self.online:
...@@ -86,22 +92,25 @@ class Node(TimeStampedModel): ...@@ -86,22 +92,25 @@ class Node(TimeStampedModel):
def disable(self, user=None): def disable(self, user=None):
''' Disable the node.''' ''' Disable the node.'''
with node_activity(code_suffix='disable', node=self, user=user): if self.enabled is True:
self.enabled = False with node_activity(code_suffix='disable', node=self, user=user):
self.save() self.enabled = False
self.save()
def enable(self, user=None): def enable(self, user=None):
''' Enable the node. ''' ''' Enable the node. '''
with node_activity(code_suffix='enable', node=self, user=user): if self.enabled is not True:
self.enabled = True with node_activity(code_suffix='enable', node=self, user=user):
self.save() self.enabled = True
self.save()
self.get_num_cores(invalidate_cache=True)
self.get_ram_size(invalidate_cache=True)
@method_cache(300) @method_cache(300)
def get_ram_size(self): def get_ram_size(self):
"""Bytes of total memory in the node. """Bytes of total memory in the node.
""" """
return self.remote_query(vm_tasks.get_ram_size, default=0)
return self.remote_query(vm_tasks.get_ram_size)
ram_size = property(get_ram_size) ram_size = property(get_ram_size)
...@@ -113,25 +122,77 @@ class Node(TimeStampedModel): ...@@ -113,25 +122,77 @@ class Node(TimeStampedModel):
@method_cache(30) @method_cache(30)
def get_remote_queue_name(self, queue_id): def get_remote_queue_name(self, queue_id):
""" Return the remote queue name """Return the name of the remote celery queue for this node.
throws Exception if there is no worker on the queue. throws Exception if there is no worker on the queue.
Until the cache provide reult there can be dead quques. Until the cache provide reult there can be dead queues.
""" """
if vm_tasks.check_queue(self.host.hostname, queue_id): if vm_tasks.check_queue(self.host.hostname, queue_id):
self.node_online()
return self.host.hostname + "." + queue_id return self.host.hostname + "." + queue_id
else: else:
if self.enabled is True:
self.node_offline()
raise WorkerNotFound() raise WorkerNotFound()
def node_online(self):
"""Create activity and log entry when node reappears.
"""
try:
act = self.activity_log.order_by('-pk')[0]
except IndexError:
pass # no monitoring activity at all
else:
logger.debug("The last activity was %s" % act)
if act.activity_code.endswith("offline"):
act = NodeActivity.create(code_suffix='monitor_succes_online',
node=self, user=None)
act.started = timezone.now()
act.finished = timezone.now()
act.succeeded = True
act.save()
logger.info("Node %s is ONLINE." % self.name)
self.get_num_cores(invalidate_cache=True)
self.get_ram_size(invalidate_cache=True)
def node_offline(self):
"""Called when a node disappears.
If the node is not already offline, record an activity and a log entry.
"""
try:
act = self.activity_log.order_by('-pk')[0]
except IndexError:
pass # no activity at all
else:
logger.debug("The last activity was %s" % act)
if act.activity_code.endswith("offline"):
return
act = NodeActivity.create(code_suffix='monitor_failed_offline',
node=self, user=None)
act.started = timezone.now()
act.finished = timezone.now()
act.succeeded = False
act.save()
logger.critical("Node %s is OFFLINE%s.", self.name,
", but enabled" if self.enabled else "")
# TODO: check if we should reschedule any VMs?
def remote_query(self, task, timeout=30, raise_=False, default=None): def remote_query(self, task, timeout=30, raise_=False, default=None):
"""Query the given task, and get the result. """Query the given task, and get the result.
If the result is not ready in timeout secs, return default value or If the result is not ready or worker not reachable
raise a TimeoutError.""" in timeout secs, return default value or raise a
r = task.apply_async( TimeoutError or WorkerNotFound exception.
queue=self.get_remote_queue_name('vm'), expires=timeout + 60) """
try: try:
r = task.apply_async(
queue=self.get_remote_queue_name('vm'), expires=timeout + 60)
return r.get(timeout=timeout) return r.get(timeout=timeout)
except TimeoutError: except (TimeoutError, WorkerNotFound):
if raise_: if raise_:
raise raise
else: else:
...@@ -175,6 +236,11 @@ class Node(TimeStampedModel): ...@@ -175,6 +236,11 @@ class Node(TimeStampedModel):
return float(self.get_monitor_info()["memory.usage"]) / 100 return float(self.get_monitor_info()["memory.usage"]) / 100
def update_vm_states(self): def update_vm_states(self):
"""Update state of Instances running on this Node.
Query state of all libvirt domains, and notify Instances by their
vm_state_changed hook.
"""
domains = {} domains = {}
domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5) domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5)
if domain_list is None: if domain_list is None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment