Commit 2d99cd14 by Guba Sándor

Merge branch 'feature-priority-queues'

Conflicts:
	circle/vm/models/instance.py
parents 5531552b 51461497
...@@ -59,11 +59,12 @@ class DataStore(Model): ...@@ -59,11 +59,12 @@ class DataStore(Model):
def __unicode__(self): def __unicode__(self):
return u'%s (%s)' % (self.name, self.path) return u'%s (%s)' % (self.name, self.path)
def get_remote_queue_name(self, queue_id, check_worker=True): def get_remote_queue_name(self, queue_id, priority=None,
check_worker=True):
logger.debug("Checking for storage queue %s.%s", logger.debug("Checking for storage queue %s.%s",
self.hostname, queue_id) self.hostname, queue_id)
if not check_worker or local_tasks.check_queue(self.hostname, if not check_worker or local_tasks.check_queue(self.hostname,
queue_id): queue_id, priority):
return self.hostname + '.' + queue_id return self.hostname + '.' + queue_id
else: else:
raise WorkerNotFound() raise WorkerNotFound()
...@@ -292,11 +293,13 @@ class Disk(AclBase, TimeStampedModel): ...@@ -292,11 +293,13 @@ class Disk(AclBase, TimeStampedModel):
'type': 'snapshot' if self.base else 'normal' 'type': 'snapshot' if self.base else 'normal'
} }
def get_remote_queue_name(self, queue_id='storage', check_worker=True): def get_remote_queue_name(self, queue_id='storage', priority=None,
check_worker=True):
"""Returns the proper queue name based on the datastore. """Returns the proper queue name based on the datastore.
""" """
if self.datastore: if self.datastore:
return self.datastore.get_remote_queue_name(queue_id, check_worker) return self.datastore.get_remote_queue_name(queue_id, priority,
check_worker)
else: else:
return None return None
......
...@@ -20,14 +20,16 @@ from celery.contrib.abortable import AbortableTask ...@@ -20,14 +20,16 @@ from celery.contrib.abortable import AbortableTask
@celery.task @celery.task
def check_queue(storage, queue_id): def check_queue(storage, queue_id, priority):
''' Celery inspect job to check for active workers at queue_id ''' Celery inspect job to check for active workers at queue_id
return True/False return True/False
''' '''
drivers = ['storage', 'download']
worker_list = [storage + "." + d for d in drivers]
queue_name = storage + "." + queue_id queue_name = storage + "." + queue_id
active_queues = celery.control.inspect(worker_list).active_queues() if priority is not None:
queue_name = queue_name + "." + priority
inspect = celery.control.inspect()
inspect.timeout = 0.1
active_queues = inspect.active_queues()
if active_queues is None: if active_queues is None:
return False return False
......
...@@ -630,12 +630,12 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -630,12 +630,12 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
'raw_data': "" if not self.raw_data else self.raw_data 'raw_data': "" if not self.raw_data else self.raw_data
} }
def get_remote_queue_name(self, queue_id): def get_remote_queue_name(self, queue_id, priority=None):
"""Get the remote worker queue name of this instance with the specified """Get the remote worker queue name of this instance with the specified
queue ID. queue ID.
""" """
if self.node: if self.node:
return self.node.get_remote_queue_name(queue_id) return self.node.get_remote_queue_name(queue_id, priority)
else: else:
raise Node.DoesNotExist() raise Node.DoesNotExist()
...@@ -796,7 +796,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -796,7 +796,7 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
net.shutdown() net.shutdown()
def delete_vm(self, timeout=15): def delete_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'fast')
try: try:
return vm_tasks.destroy.apply_async(args=[self.vm_name], return vm_tasks.destroy.apply_async(args=[self.vm_name],
queue=queue_name queue=queue_name
...@@ -809,38 +809,38 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -809,38 +809,38 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
raise raise
def deploy_vm(self, timeout=15): def deploy_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.deploy.apply_async(args=[self.get_vm_desc()], return vm_tasks.deploy.apply_async(args=[self.get_vm_desc()],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def migrate_vm(self, to_node, timeout=120): def migrate_vm(self, to_node, timeout=120):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.migrate.apply_async(args=[self.vm_name, return vm_tasks.migrate.apply_async(args=[self.vm_name,
to_node.host.hostname], to_node.host.hostname],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def reboot_vm(self, timeout=5): def reboot_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.reboot.apply_async(args=[self.vm_name], return vm_tasks.reboot.apply_async(args=[self.vm_name],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def reset_vm(self, timeout=5): def reset_vm(self, timeout=5):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'fast')
return vm_tasks.reset.apply_async(args=[self.vm_name], return vm_tasks.reset.apply_async(args=[self.vm_name],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def resume_vm(self, timeout=15): def resume_vm(self, timeout=15):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.resume.apply_async(args=[self.vm_name], return vm_tasks.resume.apply_async(args=[self.vm_name],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def shutdown_vm(self, task=None, step=5): def shutdown_vm(self, task=None, step=5):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'slow')
logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name, logger.debug("RPC Shutdown at queue: %s, for vm: %s.", queue_name,
self.vm_name) self.vm_name)
remote = vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name}, remote = vm_tasks.shutdown.apply_async(kwargs={'name': self.vm_name},
...@@ -855,14 +855,14 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin, ...@@ -855,14 +855,14 @@ class Instance(AclBase, VirtualMachineDescModel, StatusModel, OperatedMixin,
raise Exception("Shutdown aborted by user.") raise Exception("Shutdown aborted by user.")
def suspend_vm(self, timeout=60): def suspend_vm(self, timeout=60):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.sleep.apply_async(args=[self.vm_name, return vm_tasks.sleep.apply_async(args=[self.vm_name,
self.mem_dump['path']], self.mem_dump['path']],
queue=queue_name queue=queue_name
).get(timeout=timeout) ).get(timeout=timeout)
def wake_up_vm(self, timeout=60): def wake_up_vm(self, timeout=60):
queue_name = self.get_remote_queue_name('vm') queue_name = self.get_remote_queue_name('vm', 'slow')
return vm_tasks.wake_up.apply_async(args=[self.vm_name, return vm_tasks.wake_up.apply_async(args=[self.vm_name,
self.mem_dump['path']], self.mem_dump['path']],
queue=queue_name queue=queue_name
......
...@@ -93,7 +93,7 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -93,7 +93,7 @@ class Node(OperatedMixin, TimeStampedModel):
Check if node is online by queue is available. Check if node is online by queue is available.
""" """
try: try:
self.get_remote_queue_name("vm") self.get_remote_queue_name("vm", "fast")
except: except:
return False return False
else: else:
...@@ -105,6 +105,7 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -105,6 +105,7 @@ class Node(OperatedMixin, TimeStampedModel):
@method_cache(300) @method_cache(300)
def get_info(self): def get_info(self):
return self.remote_query(vm_tasks.get_info, return self.remote_query(vm_tasks.get_info,
priority='fast',
default={'core_num': '', default={'core_num': '',
'ram_size': '0', 'ram_size': '0',
'architecture': ''}) 'architecture': ''})
...@@ -163,16 +164,19 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -163,16 +164,19 @@ class Node(OperatedMixin, TimeStampedModel):
return self.ram_size * self.overcommit return self.ram_size * self.overcommit
@method_cache(30) @method_cache(30)
def get_remote_queue_name(self, queue_id): def get_remote_queue_name(self, queue_id, priority=None):
"""Returns the name of the remote celery queue for this node. """Returns the name of the remote celery queue for this node.
Throws Exception if there is no worker on the queue. Throws Exception if there is no worker on the queue.
The result may include dead queues because of caching. The result may include dead queues because of caching.
""" """
if vm_tasks.check_queue(self.host.hostname, queue_id): if vm_tasks.check_queue(self.host.hostname, queue_id, priority):
queue_name = self.host.hostname + "." + queue_id
if priority is not None:
queue_name = queue_name + "." + priority
self.node_online() self.node_online()
return self.host.hostname + "." + queue_id return queue_name
else: else:
if self.enabled: if self.enabled:
self.node_offline() self.node_offline()
...@@ -222,7 +226,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -222,7 +226,8 @@ class Node(OperatedMixin, TimeStampedModel):
", but enabled" if self.enabled else "") ", but enabled" if self.enabled else "")
# TODO: check if we should reschedule any VMs? # TODO: check if we should reschedule any VMs?
def remote_query(self, task, timeout=30, raise_=False, default=None): def remote_query(self, task, timeout=30, priority=None, raise_=False,
default=None):
"""Query the given task, and get the result. """Query the given task, and get the result.
If the result is not ready or worker not reachable If the result is not ready or worker not reachable
...@@ -231,7 +236,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -231,7 +236,8 @@ class Node(OperatedMixin, TimeStampedModel):
""" """
try: try:
r = task.apply_async( r = task.apply_async(
queue=self.get_remote_queue_name('vm'), expires=timeout + 60) queue=self.get_remote_queue_name('vm', priority),
expires=timeout + 60)
return r.get(timeout=timeout) return r.get(timeout=timeout)
except (TimeoutError, WorkerNotFound): except (TimeoutError, WorkerNotFound):
if raise_: if raise_:
...@@ -244,7 +250,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -244,7 +250,8 @@ class Node(OperatedMixin, TimeStampedModel):
try: try:
handler = GraphiteHandler() handler = GraphiteHandler()
except RuntimeError: except RuntimeError:
return self.remote_query(vm_tasks.get_node_metrics, 30) return self.remote_query(vm_tasks.get_node_metrics, timeout=30,
priority="fast")
query = Query() query = Query()
query.set_target(self.host.hostname + ".circle") query.set_target(self.host.hostname + ".circle")
...@@ -309,7 +316,8 @@ class Node(OperatedMixin, TimeStampedModel): ...@@ -309,7 +316,8 @@ class Node(OperatedMixin, TimeStampedModel):
vm_state_changed hook. vm_state_changed hook.
""" """
domains = {} domains = {}
domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5) domain_list = self.remote_query(vm_tasks.list_domains_info, timeout=5,
priority="fast")
if domain_list is None: if domain_list is None:
logger.info("Monitoring failed at: %s", self.name) logger.info("Monitoring failed at: %s", self.name)
return return
......
...@@ -23,16 +23,19 @@ from manager.mancelery import celery ...@@ -23,16 +23,19 @@ from manager.mancelery import celery
logger = getLogger(__name__) logger = getLogger(__name__)
def check_queue(node_hostname, queue_id): def check_queue(node_hostname, queue_id, priority=None):
"""True if the queue is alive. """True if the queue is alive.
Example: check_queue('node01', 'vm'): Example: check_queue('node01', 'vm', 'slow'):
:param node_hostname: Short hostname of the node. :param node_hostname: Short hostname of the node.
:param queue_id: Queue identifier (eg. vm). :param queue_id: Queue identifier (eg. vm).
:param priority: can be 'slow', 'fast' or None
""" """
# drivers = ['vmdriver', 'netdriver', 'agentdriver'] # drivers = ['vmdriver', 'netdriver', 'agentdriver']
# worker_list = [node_hostname + "." + d for d in drivers] # worker_list = [node_hostname + "." + d for d in drivers]
queue_name = node_hostname + "." + queue_id queue_name = node_hostname + "." + queue_id
if priority is not None:
queue_name = queue_name + "." + priority
active_queues = get_queues() active_queues = get_queues()
if active_queues is None: if active_queues is None:
return False return False
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment