Commit 8aaa4d14 by Guba Sándor

node: set tasks to user fast queues

parent 0e85c19a
...@@ -76,7 +76,7 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -76,7 +76,7 @@ class Node(OperatedMixin, TimeStampedModel):
Check if node is online by queue is available. Check if node is online by queue is available.
""" """
try: try:
self.get_remote_queue_name("vm") self.get_remote_queue_name("vm", "fast")
except: except:
return False return False
else: else:
...@@ -88,6 +88,7 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -88,6 +88,7 @@ class Node(OperatedMixin, TimeStampedModel):
@method_cache(300) @method_cache(300)
def get_info(self): def get_info(self):
return self.remote_query(vm_tasks.get_info, return self.remote_query(vm_tasks.get_info,
priority='fast',
default={'core_num': '', default={'core_num': '',
'ram_size': '0', 'ram_size': '0',
'architecture': ''}) 'architecture': ''})
...@@ -146,16 +147,19 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -146,16 +147,19 @@ class Node(OperatedMixin, TimeStampedModel):
return self.ram_size * self.overcommit return self.ram_size * self.overcommit
@method_cache(30) @method_cache(30)
def get_remote_queue_name(self, queue_id): def get_remote_queue_name(self, queue_id, priority=None):
"""Returns the name of the remote celery queue for this node. """Returns the name of the remote celery queue for this node.
Throws Exception if there is no worker on the queue. Throws Exception if there is no worker on the queue.
The result may include dead queues because of caching. The result may include dead queues because of caching.
""" """
if vm_tasks.check_queue(self.host.hostname, queue_id): if vm_tasks.check_queue(self.host.hostname, queue_id, priority):
queue_name = self.host.hostname + "." + queue_id
if priority is not None:
queue_name = queue_name + "." + priority
self.node_online() self.node_online()
return self.host.hostname + "." + queue_id return queue_name
else: else:
if self.enabled: if self.enabled:
self.node_offline() self.node_offline()
...@@ -205,7 +209,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -205,7 +209,8 @@ class Node(OperatedMixin, TimeStampedModel):
", but enabled" if self.enabled else "") ", but enabled" if self.enabled else "")
# TODO: check if we should reschedule any VMs? # TODO: check if we should reschedule any VMs?
def remote_query(self, task, timeout=30, raise_=False, default=None): def remote_query(self, task, timeout=30, priority=None, raise_=False,
default=None):
"""Query the given task, and get the result. """Query the given task, and get the result.
If the result is not ready or worker not reachable If the result is not ready or worker not reachable
...@@ -214,7 +219,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -214,7 +219,8 @@ class Node(OperatedMixin, TimeStampedModel):
""" """
try: try:
r = task.apply_async( r = task.apply_async(
queue=self.get_remote_queue_name('vm'), expires=timeout + 60) queue=self.get_remote_queue_name('vm', priority),
expires=timeout + 60)
return r.get(timeout=timeout) return r.get(timeout=timeout)
except (TimeoutError, WorkerNotFound): except (TimeoutError, WorkerNotFound):
if raise_: if raise_:
...@@ -227,7 +233,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -227,7 +233,8 @@ class Node(OperatedMixin, TimeStampedModel):
try: try:
handler = GraphiteHandler() handler = GraphiteHandler()
except RuntimeError: except RuntimeError:
return self.remote_query(vm_tasks.get_node_metrics, 30) return self.remote_query(vm_tasks.get_node_metrics, timeout=30,
priority="fast")
query = Query() query = Query()
query.set_target(self.host.hostname + ".circle") query.set_target(self.host.hostname + ".circle")
...@@ -276,7 +283,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -276,7 +283,8 @@ class Node(OperatedMixin, TimeStampedModel):
vm_state_changed hook. vm_state_changed hook.
""" """
domains = {} domains = {}
domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5) domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5,
priority="fast")
if domain_list is None: if domain_list is None:
logger.info("Monitoring failed at: %s", self.name) logger.info("Monitoring failed at: %s", self.name)
return return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment