Commit 68dd404a by Guba Sándor

node,vm_tasks: added check_queue() method to get_remore_queue_name

This call can inspect wather the queue has active worker waiting for
tasks.
parent e3ddb284
...@@ -94,8 +94,16 @@ class Node(TimeStampedModel): ...@@ -94,8 +94,16 @@ class Node(TimeStampedModel):
""" """
return self.ram_size * self.overcommit return self.ram_size * self.overcommit
@method_cache(30)
def get_remote_queue_name(self, queue_id): def get_remote_queue_name(self, queue_id):
return self.host.hostname + "." + queue_id """ Return the remote queue name
throws Exception if there is no worker on the queue.
Until the cache provide reult there can be dead quques.
"""
if vm_tasks.check_queue(self.host.hostname, queue_id):
return self.host.hostname + "." + queue_id
else:
raise Exception("Worker not found.")
def remote_query(self, task, timeout=30, raise_=False, default=None): def remote_query(self, task, timeout=30, raise_=False, default=None):
"""Query the given task, and get the result. """Query the given task, and get the result.
......
from manager.mancelery import celery from manager.mancelery import celery
def check_queue(node_hostname, queue_id):
drivers = ['vmdriver', 'netdriver']
worker_list = [node_hostname + "." + d for d in drivers]
queue_name = node_hostname + "." + queue_id
inspect = celery.control.inspect(worker_list)
# v is List of List of queues dict
node_workers = [v for k, v in inspect.active_queues().iteritems()]
for worker in node_workers:
for queue in worker:
if queue['name'] == queue_name:
return True
return False
@celery.task(name='vmdriver.create') @celery.task(name='vmdriver.create')
def deploy(params): def deploy(params):
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment