Commit e59f2b72 by Dudás Ádám Committed by Your Name

move local celery tasks beside remote tasks in vm and storage

parent 809d0cbb
...@@ -6,11 +6,11 @@ HOSTNAME = "localhost" ...@@ -6,11 +6,11 @@ HOSTNAME = "localhost"
celery = Celery('manager', backend='amqp', celery = Celery('manager', backend='amqp',
broker=getenv("AMQP_URI"), broker=getenv("AMQP_URI"),
include=['manager.vm_manager', 'manager.storage_manager']) include=['vm.tasks.local_tasks', 'storage.tasks.local_tasks'])
celery.conf.update( celery.conf.update(
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME + '.man', Exchange( Queue(HOSTNAME + '.man', Exchange('manager', type='direct'),
'manager', type='direct'), routing_key="manager"), routing_key="manager"),
) )
) )
...@@ -8,8 +8,7 @@ from django.utils.translation import ugettext_lazy as _ ...@@ -8,8 +8,7 @@ from django.utils.translation import ugettext_lazy as _
from model_utils.models import TimeStampedModel from model_utils.models import TimeStampedModel
from sizefield.models import FileSizeField from sizefield.models import FileSizeField
from manager import storage_manager from .tasks import local_tasks, remote_tasks
import tasks
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -115,7 +114,7 @@ class Disk(TimeStampedModel): ...@@ -115,7 +114,7 @@ class Disk(TimeStampedModel):
return u"%s (#%d)" % (self.name, self.id) return u"%s (#%d)" % (self.name, self.id)
def deploy_async(self): def deploy_async(self):
storage_manager.deploy.apply_async(self) local_tasks.deploy.apply_async(self)
def deploy(self): def deploy(self):
"""Reify the disk model on the associated data store. """Reify the disk model on the associated data store.
...@@ -138,7 +137,7 @@ class Disk(TimeStampedModel): ...@@ -138,7 +137,7 @@ class Disk(TimeStampedModel):
'base_name': self.base.name if self.base else None, 'base_name': self.base.name if self.base else None,
'type': 'snapshot' if self.type == 'qcow2-snap' else 'normal' 'type': 'snapshot' if self.type == 'qcow2-snap' else 'normal'
} }
tasks.create_disk.apply_async( remote_tasks.create_disk.apply_async(
args=[disk_desc], queue=self.datastore.hostname + ".storage").get() args=[disk_desc], queue=self.datastore.hostname + ".storage").get()
self.ready = True self.ready = True
self.save() self.save()
......
from .mancelery import celery from manager.mancelery import celery
@celery.task @celery.task
......
import celery from manager.mancelery import celery
import logging
logger = logging.getLogger(__name__)
@celery.task(name='storagedriver.list_disks') @celery.task(name='storagedriver.list_disks')
......
...@@ -14,9 +14,8 @@ from django.utils.translation import ugettext_lazy as _ ...@@ -14,9 +14,8 @@ from django.utils.translation import ugettext_lazy as _
from model_utils.models import TimeStampedModel from model_utils.models import TimeStampedModel
from netaddr import EUI from netaddr import EUI
from . import tasks from .tasks import local_tasks, remote_tasks
from firewall.models import Vlan, Host from firewall.models import Vlan, Host
from manager import vm_manager
from storage.models import Disk from storage.models import Disk
...@@ -482,7 +481,8 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -482,7 +481,8 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
def deploy_async(self, user=None): def deploy_async(self, user=None):
""" Launch celery task to handle the job asynchronously. """ Launch celery task to handle the job asynchronously.
""" """
vm_manager.deploy.apply_async(args=[self, user], queue="localhost.man") local_tasks.deploy.apply_async(args=[self, user],
queue="localhost.man")
def deploy(self, user=None, task_uuid=None): def deploy(self, user=None, task_uuid=None):
""" Deploy new virtual machine with network """ Deploy new virtual machine with network
...@@ -507,8 +507,9 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -507,8 +507,9 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
# Deploy VM on remote machine # Deploy VM on remote machine
act.update_state("DEPLOYING VM") act.update_state("DEPLOYING VM")
tasks.create.apply_async(args=[self.get_vm_desc()], queue_name = self.node.host.hostname + ".vm"
queue=self.node.host.hostname + ".vm").get() remote_tasks.create.apply_async(args=[self.get_vm_desc()],
queue=queue_name).get()
# Estabilish network connection (vmdriver) # Estabilish network connection (vmdriver)
act.update_state("DEPLOYING NET") act.update_state("DEPLOYING NET")
...@@ -517,13 +518,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -517,13 +518,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
# Resume vm # Resume vm
act.update_state("BOOTING") act.update_state("BOOTING")
tasks.resume.apply_async(args=[self.vm_name], remote_tasks.resume.apply_async(args=[self.vm_name],
queue=self.node + ".vm").get() queue=self.node + ".vm").get()
act.finish(result='SUCCESS') act.finish(result='SUCCESS')
def stop_async(self, user=None): def stop_async(self, user=None):
vm_manager.stop.apply_async(args=[self, user], queue="localhost.man") local_tasks.stop.apply_async(args=[self, user], queue="localhost.man")
def stop(self, user=None, task_uuid=None): def stop(self, user=None, task_uuid=None):
act = InstanceActivity(activity_code='vm.Instance.stop') act = InstanceActivity(activity_code='vm.Instance.stop')
...@@ -532,11 +533,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -532,11 +533,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
act.started = timezone.now() act.started = timezone.now()
act.task_uuid = task_uuid act.task_uuid = task_uuid
act.save() act.save()
tasks.stop.apply_async(args=[self.get_vm_desc()], queue_name = self.node.host.hostname + ".vm"
queue=self.node.host.hostname + ".vm").get() remote_tasks.stop.apply_async(args=[self.get_vm_desc()],
queue=queue_name).get()
def resume_async(self, user=None): def resume_async(self, user=None):
vm_manager.resume.apply_async(args=[self, user], queue="localhost.man") local_tasks.resume.apply_async(args=[self, user],
queue="localhost.man")
def resume(self, user=None, task_uuid=None): def resume(self, user=None, task_uuid=None):
act = InstanceActivity(activity_code='vm.Instance.resume') act = InstanceActivity(activity_code='vm.Instance.resume')
...@@ -545,12 +548,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -545,12 +548,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
act.started = timezone.now() act.started = timezone.now()
act.task_uuid = task_uuid act.task_uuid = task_uuid
act.save() act.save()
tasks.resume.apply_async(args=[self.get_vm_desc()], queue_name = self.node.host.hostname + ".vm"
queue=self.node.host.hostname + ".vm").get() remote_tasks.resume.apply_async(args=[self.get_vm_desc()],
queue=queue_name).get()
def poweroff_async(self, user=None): def poweroff_async(self, user=None):
vm_manager.power_off.apply_async(args=[self, user], local_tasks.power_off.apply_async(args=[self, user],
queue="localhost.man") queue="localhost.man")
def poweroff(self, user=None, task_uuid=None): def poweroff(self, user=None, task_uuid=None):
act = InstanceActivity(activity_code='vm.Instance.power_off') act = InstanceActivity(activity_code='vm.Instance.power_off')
...@@ -559,13 +563,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -559,13 +563,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
act.started = timezone.now() act.started = timezone.now()
act.task_uuid = task_uuid act.task_uuid = task_uuid
act.save() act.save()
tasks.power_off.apply_async(args=[self.get_vm_desc()], queue_name = self.node.host.hostname + ".vm"
queue=self.node.host.hostname + ".vm" remote_tasks.power_off.apply_async(args=[self.get_vm_desc()],
).get() queue=queue_name).get()
def restart_async(self, user=None): def restart_async(self, user=None):
vm_manager.restart.apply_async(args=[self, user], local_tasks.restart.apply_async(args=[self, user],
queue="localhost.man") queue="localhost.man")
def restart(self, user=None, task_uuid=None): def restart(self, user=None, task_uuid=None):
act = InstanceActivity(activity_code='vm.Instance.restart') act = InstanceActivity(activity_code='vm.Instance.restart')
...@@ -574,12 +578,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -574,12 +578,13 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
act.started = timezone.now() act.started = timezone.now()
act.task_uuid = task_uuid act.task_uuid = task_uuid
act.save() act.save()
tasks.restart.apply_async(args=[self.get_vm_desc()], queue_name = self.node.host.hostname + ".vm"
queue=self.node.host.hostname + ".vm").get() remote_tasks.restart.apply_async(args=[self.get_vm_desc()],
queue=queue_name).get()
def save_as_async(self, user=None): def save_as_async(self, user=None):
vm_manager.save_as.apply_async( local_tasks.save_as.apply_async(args=[self, user],
args=[self, user], queue="localhost.man") queue="localhost.man")
def save_as(self, user=None, task_uuid=None): def save_as(self, user=None, task_uuid=None):
act = InstanceActivity(activity_code='vm.Instance.restart') act = InstanceActivity(activity_code='vm.Instance.restart')
...@@ -588,8 +593,9 @@ class Instance(BaseResourceConfigModel, TimeStampedModel): ...@@ -588,8 +593,9 @@ class Instance(BaseResourceConfigModel, TimeStampedModel):
act.started = timezone.now() act.started = timezone.now()
act.task_uuid = task_uuid act.task_uuid = task_uuid
act.save() act.save()
tasks.save_as.apply_async(args=[self.get_vm_desc()], queue_name = self.node.host.hostname + ".vm"
queue=self.node.host.hostname + ".vm").get() remote_tasks.save_as.apply_async(args=[self.get_vm_desc()],
queue=queue_name).get()
def renew(self, which='both'): def renew(self, which='both'):
"""Renew virtual machine instance leases. """Renew virtual machine instance leases.
......
from .mancelery import celery from manager.mancelery import celery
@celery.task @celery.task
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment