Commit feeea6da by Szabolcs Gelencser

Comment unused celery tasks, configure local celery transport

parent 1d8b8edb
File added
No preview for this file type
...@@ -368,6 +368,8 @@ DJANGO_APPS = ( ...@@ -368,6 +368,8 @@ DJANGO_APPS = (
# Admin panel and documentation: # Admin panel and documentation:
'django.contrib.admin', 'django.contrib.admin',
# 'django.contrib.admindocs', # 'django.contrib.admindocs',
'kombu.transport.django',
'djcelery',
) )
THIRD_PARTY_APPS = ( THIRD_PARTY_APPS = (
...@@ -399,6 +401,13 @@ LOCAL_APPS = ( ...@@ -399,6 +401,13 @@ LOCAL_APPS = (
INSTALLED_APPS = DJANGO_APPS + THIRD_PARTY_APPS + LOCAL_APPS INSTALLED_APPS = DJANGO_APPS + THIRD_PARTY_APPS + LOCAL_APPS
########## END APP CONFIGURATION ########## END APP CONFIGURATION
BROKER_URL = "django://" # tell kombu to use the Django database as the message queue
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS = ('vm.tasks',)
AUTHENTICATION_BACKENDS = ('openstack_auth.backend.KeystoneBackend',) AUTHENTICATION_BACKENDS = ('openstack_auth.backend.KeystoneBackend',)
AUTHENTICATION_URLS = ['openstack_auth.urls'] AUTHENTICATION_URLS = ['openstack_auth.urls']
AUTH_USER_MODEL = 'openstack_auth.User' AUTH_USER_MODEL = 'openstack_auth.User'
......
...@@ -38,7 +38,7 @@ from django.utils.functional import Promise ...@@ -38,7 +38,7 @@ from django.utils.functional import Promise
from django.utils.translation import ugettext_lazy as _, ugettext_noop from django.utils.translation import ugettext_lazy as _, ugettext_noop
from jsonfield import JSONField from jsonfield import JSONField
from manager.mancelery import celery # from manager.mancelery import celery
from model_utils.models import TimeStampedModel from model_utils.models import TimeStampedModel
from openstack_auth.user import User from openstack_auth.user import User
...@@ -242,7 +242,7 @@ class ActivityModel(TimeStampedModel): ...@@ -242,7 +242,7 @@ class ActivityModel(TimeStampedModel):
return 'failed' return 'failed'
@celery.task() # @celery.task()
def compute_cached(method, instance, memcached_seconds, def compute_cached(method, instance, memcached_seconds,
key, start, *args, **kwargs): key, start, *args, **kwargs):
"""Compute and store actual value of cached method.""" """Compute and store actual value of cached method."""
......
...@@ -47,7 +47,7 @@ from common.models import ( ...@@ -47,7 +47,7 @@ from common.models import (
split_activity_code, split_activity_code,
) )
from firewall.models import Vlan, Host, Rule from firewall.models import Vlan, Host, Rule
from manager.scheduler import SchedulerError # from manager.scheduler import SchedulerError
from network.models import DefaultPublicRouter, DefaultPublicRoutedNet from network.models import DefaultPublicRouter, DefaultPublicRoutedNet
from openstack_api.nova import Server from openstack_api.nova import Server
from request.forms import TemplateRequestForm from request.forms import TemplateRequestForm
......
...@@ -22,7 +22,7 @@ import django.conf ...@@ -22,7 +22,7 @@ import django.conf
from django.core.cache import cache from django.core.cache import cache
from celery.exceptions import TimeoutError from celery.exceptions import TimeoutError
from manager.mancelery import celery # from manager.mancelery import celery
from common.models import WorkerNotFound from common.models import WorkerNotFound
settings = django.conf.settings.FIREWALL_SETTINGS settings = django.conf.settings.FIREWALL_SETTINGS
...@@ -60,7 +60,7 @@ def get_firewall_queues(): ...@@ -60,7 +60,7 @@ def get_firewall_queues():
return list(retval) return list(retval)
@celery.task #@celery.task
def reloadtask_worker(): def reloadtask_worker():
from firewall.fw import BuildFirewall, dhcp, dns, ipset, vlan from firewall.fw import BuildFirewall, dhcp, dns, ipset, vlan
from remote_tasks import (reload_dns, reload_dhcp, reload_firewall, from remote_tasks import (reload_dns, reload_dhcp, reload_firewall,
...@@ -91,7 +91,7 @@ def reloadtask_worker(): ...@@ -91,7 +91,7 @@ def reloadtask_worker():
lambda: (list(ipset()), )) lambda: (list(ipset()), ))
@celery.task #@celery.task
def reloadtask(type='Host', timeout=15, sync=False): def reloadtask(type='Host', timeout=15, sync=False):
reload = { reload = {
'Host': ['dns', 'dhcp', 'firewall'], 'Host': ['dns', 'dhcp', 'firewall'],
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from manager.mancelery import celery # from manager.mancelery import celery
def check_queue(firewall, queue_id, priority): def check_queue(firewall, queue_id, priority):
...@@ -36,32 +36,32 @@ def check_queue(firewall, queue_id, priority): ...@@ -36,32 +36,32 @@ def check_queue(firewall, queue_id, priority):
return queue_name in queue_names return queue_name in queue_names
@celery.task(name='firewall.reload_dns') #@celery.task(name='firewall.reload_dns')
def reload_dns(data): def reload_dns(data):
pass pass
@celery.task(name='firewall.reload_firewall') #@celery.task(name='firewall.reload_firewall')
def reload_firewall(data4, data6): def reload_firewall(data4, data6):
pass pass
@celery.task(name='firewall.reload_firewall_vlan') #@celery.task(name='firewall.reload_firewall_vlan')
def reload_firewall_vlan(data): def reload_firewall_vlan(data):
pass pass
@celery.task(name='firewall.reload_dhcp') #@celery.task(name='firewall.reload_dhcp')
def reload_dhcp(data): def reload_dhcp(data):
pass pass
@celery.task(name='firewall.reload_blacklist') #@celery.task(name='firewall.reload_blacklist')
def reload_blacklist(data): def reload_blacklist(data):
pass pass
@celery.task(name='firewall.get_dhcp_clients') #@celery.task(name='firewall.get_dhcp_clients')
def get_dhcp_clients(): def get_dhcp_clients():
# {'00:21:5a:73:72:cd': {'interface': 'OFF', 'ip': None, 'hostname': None}} # {'00:21:5a:73:72:cd': {'interface': 'OFF', 'ip': None, 'hostname': None}}
pass pass
# Copyright 2014 Budapest University of Technology and Economics (BME IK) # # Copyright 2014 Budapest University of Technology and Economics (BME IK)
# #
# # This file is part of CIRCLE Cloud.
# #
# # CIRCLE is free software: you can redistribute it and/or modify it under
# # the terms of the GNU General Public License as published by the Free
# # Software Foundation, either version 3 of the License, or (at your option)
# # any later version.
# #
# # CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# # details.
# #
# # You should have received a copy of the GNU General Public License along
# # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
# #
# This file is part of CIRCLE Cloud. # from celery import Celery
# from celery.signals import worker_ready
# from datetime import timedelta
# from kombu import Queue, Exchange
# from os import getenv
# #
# CIRCLE is free software: you can redistribute it and/or modify it under # HOSTNAME = "localhost"
# the terms of the GNU General Public License as published by the Free # QUEUE_NAME = HOSTNAME + '.man'
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
# #
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
# #
# You should have received a copy of the GNU General Public License along # celery = Celery('manager',
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # broker=getenv("AMQP_URI"),
# include=['vm.tasks.local_tasks',
from celery import Celery # 'vm.tasks.local_periodic_tasks',
from celery.signals import worker_ready # 'vm.tasks.local_agent_tasks',
from datetime import timedelta # 'storage.tasks.local_tasks',
from kombu import Queue, Exchange # 'storage.tasks.periodic_tasks',
from os import getenv # 'firewall.tasks.local_tasks',
# 'dashboard.tasks.local_periodic_tasks',
HOSTNAME = "localhost" # ])
QUEUE_NAME = HOSTNAME + '.man' #
# celery.conf.update(
# CELERY_RESULT_BACKEND='amqp',
celery = Celery('manager', # CELERY_TASK_RESULT_EXPIRES=300,
broker=getenv("AMQP_URI"), # CELERY_QUEUES=(
include=['vm.tasks.local_tasks', # Queue(HOSTNAME + '.man', Exchange('manager', type='direct'),
'vm.tasks.local_periodic_tasks', # routing_key="manager"),
'vm.tasks.local_agent_tasks', # ),
'storage.tasks.local_tasks', # CELERYBEAT_SCHEDULE={
'storage.tasks.periodic_tasks', # 'storage.periodic_tasks': {
'firewall.tasks.local_tasks', # 'task': 'storage.tasks.periodic_tasks.garbage_collector',
'dashboard.tasks.local_periodic_tasks', # 'schedule': timedelta(hours=1),
]) # 'options': {'queue': 'localhost.man'}
# },
celery.conf.update( # 'dashboard.send_email_notifications': {
CELERY_RESULT_BACKEND='amqp', # 'task': 'dashboard.tasks.local_periodic_tasks.'
CELERY_TASK_RESULT_EXPIRES=300, # 'send_email_notifications',
CELERY_QUEUES=( # 'schedule': timedelta(hours=24),
Queue(HOSTNAME + '.man', Exchange('manager', type='direct'), # 'options': {'queue': 'localhost.man'}
routing_key="manager"), # },
), # }
CELERYBEAT_SCHEDULE={ #
'storage.periodic_tasks': { # )
'task': 'storage.tasks.periodic_tasks.garbage_collector', #
'schedule': timedelta(hours=1), #
'options': {'queue': 'localhost.man'} # @worker_ready.connect()
}, # def cleanup_tasks(conf=None, **kwargs):
'dashboard.send_email_notifications': { # '''Discard all task and clean up activity.'''
'task': 'dashboard.tasks.local_periodic_tasks.' # from vm.models.activity import cleanup
'send_email_notifications', # cleanup(queue_name=QUEUE_NAME)
'schedule': timedelta(hours=24),
'options': {'queue': 'localhost.man'}
},
}
)
@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
'''Discard all task and clean up activity.'''
from vm.models.activity import cleanup
cleanup(queue_name=QUEUE_NAME)
# Copyright 2014 Budapest University of Technology and Economics (BME IK) # # Copyright 2014 Budapest University of Technology and Economics (BME IK)
# #
# # This file is part of CIRCLE Cloud.
# #
# # CIRCLE is free software: you can redistribute it and/or modify it under
# # the terms of the GNU General Public License as published by the Free
# # Software Foundation, either version 3 of the License, or (at your option)
# # any later version.
# #
# # CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# # details.
# #
# # You should have received a copy of the GNU General Public License along
# # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
# #
# This file is part of CIRCLE Cloud. # from celery import Celery
# from celery.signals import worker_ready
# from datetime import timedelta
# from kombu import Queue, Exchange
# from os import getenv
# #
# CIRCLE is free software: you can redistribute it and/or modify it under # HOSTNAME = "localhost"
# the terms of the GNU General Public License as published by the Free # QUEUE_NAME = HOSTNAME + '.monitor'
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
# #
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY # celery = Celery('monitor',
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # broker=getenv("AMQP_URI"),
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # include=['vm.tasks.local_periodic_tasks',
# details. # 'monitor.tasks.local_periodic_tasks',
# ])
# #
# You should have received a copy of the GNU General Public License along # celery.conf.update(
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # CELERY_RESULT_BACKEND='amqp',
# CELERY_TASK_RESULT_EXPIRES=300,
from celery import Celery # CELERY_QUEUES=(
from celery.signals import worker_ready # Queue(QUEUE_NAME, Exchange('monitor', type='direct'),
from datetime import timedelta # routing_key="monitor"),
from kombu import Queue, Exchange # ),
from os import getenv # CELERYBEAT_SCHEDULE={
# 'vm.update_domain_states': {
HOSTNAME = "localhost" # 'task': 'vm.tasks.local_periodic_tasks.update_domain_states',
QUEUE_NAME = HOSTNAME + '.monitor' # 'schedule': timedelta(seconds=10),
# 'options': {'queue': 'localhost.monitor'}
celery = Celery('monitor', # },
broker=getenv("AMQP_URI"), # 'monitor.measure_response_time': {
include=['vm.tasks.local_periodic_tasks', # 'task': 'monitor.tasks.local_periodic_tasks.'
'monitor.tasks.local_periodic_tasks', # 'measure_response_time',
]) # 'schedule': timedelta(seconds=30),
# 'options': {'queue': 'localhost.monitor'}
celery.conf.update( # },
CELERY_RESULT_BACKEND='amqp', # 'monitor.check_celery_queues': {
CELERY_TASK_RESULT_EXPIRES=300, # 'task': 'monitor.tasks.local_periodic_tasks.'
CELERY_QUEUES=( # 'check_celery_queues',
Queue(QUEUE_NAME, Exchange('monitor', type='direct'), # 'schedule': timedelta(seconds=60),
routing_key="monitor"), # 'options': {'queue': 'localhost.monitor'}
), # },
CELERYBEAT_SCHEDULE={ # 'monitor.instance_per_template': {
'vm.update_domain_states': { # 'task': 'monitor.tasks.local_periodic_tasks.'
'task': 'vm.tasks.local_periodic_tasks.update_domain_states', # 'instance_per_template',
'schedule': timedelta(seconds=10), # 'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.monitor'} # 'options': {'queue': 'localhost.monitor'}
}, # },
'monitor.measure_response_time': { # 'monitor.allocated_memory': {
'task': 'monitor.tasks.local_periodic_tasks.' # 'task': 'monitor.tasks.local_periodic_tasks.'
'measure_response_time', # 'allocated_memory',
'schedule': timedelta(seconds=30), # 'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.monitor'} # 'options': {'queue': 'localhost.monitor'}
}, # },
'monitor.check_celery_queues': { # }
'task': 'monitor.tasks.local_periodic_tasks.' #
'check_celery_queues', # )
'schedule': timedelta(seconds=60), #
'options': {'queue': 'localhost.monitor'} #
}, # @worker_ready.connect()
'monitor.instance_per_template': { # def cleanup_tasks(conf=None, **kwargs):
'task': 'monitor.tasks.local_periodic_tasks.' # '''Discard all task and clean up activity.'''
'instance_per_template', # from vm.models.activity import cleanup
'schedule': timedelta(seconds=30), # cleanup(queue_name=QUEUE_NAME)
'options': {'queue': 'localhost.monitor'}
},
'monitor.allocated_memory': {
'task': 'monitor.tasks.local_periodic_tasks.'
'allocated_memory',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.monitor'}
},
}
)
@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
'''Discard all task and clean up activity.'''
from vm.models.activity import cleanup
cleanup(queue_name=QUEUE_NAME)
# Copyright 2014 Budapest University of Technology and Economics (BME IK) # # Copyright 2014 Budapest University of Technology and Economics (BME IK)
# # #
# This file is part of CIRCLE Cloud. # # This file is part of CIRCLE Cloud.
# # #
# CIRCLE is free software: you can redistribute it and/or modify it under # # CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free # # the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) # # Software Foundation, either version 3 of the License, or (at your option)
# any later version. # # any later version.
# # #
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY # # CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details. # # details.
# # #
# You should have received a copy of the GNU General Public License along # # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
#
from logging import getLogger # from logging import getLogger
#
from django.utils.translation import ugettext_noop # from django.utils.translation import ugettext_noop
#
from common.models import HumanReadableException # from common.models import HumanReadableException
#
logger = getLogger(__name__) # logger = getLogger(__name__)
#
#
class SchedulerError(HumanReadableException): # class SchedulerError(HumanReadableException):
admin_message = None # admin_message = None
#
def __init__(self, params=None, level=None, **kwargs): # def __init__(self, params=None, level=None, **kwargs):
kwargs.update(params or {}) # kwargs.update(params or {})
super(SchedulerError, self).__init__( # super(SchedulerError, self).__init__(
level, self.message, self.admin_message or self.message, # level, self.message, self.admin_message or self.message,
kwargs) # kwargs)
#
#
class NotEnoughMemoryException(SchedulerError): # class NotEnoughMemoryException(SchedulerError):
message = ugettext_noop( # message = ugettext_noop(
"The resources required for launching the virtual machine are not " # "The resources required for launching the virtual machine are not "
"available currently. Please try again later.") # "available currently. Please try again later.")
#
admin_message = ugettext_noop( # admin_message = ugettext_noop(
"The required free memory for launching the virtual machine is not " # "The required free memory for launching the virtual machine is not "
"available on any usable node currently. Please try again later.") # "available on any usable node currently. Please try again later.")
#
#
class TraitsUnsatisfiableException(SchedulerError): # class TraitsUnsatisfiableException(SchedulerError):
message = ugettext_noop( # message = ugettext_noop(
"No node can satisfy the required traits of the " # "No node can satisfy the required traits of the "
"new virtual machine currently.") # "new virtual machine currently.")
#
#
def select_node(instance, nodes): # def select_node(instance, nodes):
''' Select a node for hosting an instance based on its requirements. # ''' Select a node for hosting an instance based on its requirements.
''' # '''
# check required traits # # check required traits
nodes = [n for n in nodes # nodes = [n for n in nodes
if n.schedule_enabled and n.online and # if n.schedule_enabled and n.online and
has_traits(instance.req_traits.all(), n)] # has_traits(instance.req_traits.all(), n)]
if not nodes: # if not nodes:
logger.warning('select_node: no usable node for %s', unicode(instance)) # logger.warning('select_node: no usable node for %s', unicode(instance))
raise TraitsUnsatisfiableException() # raise TraitsUnsatisfiableException()
#
# check required RAM # # check required RAM
nodes = [n for n in nodes if has_enough_ram(instance.ram_size, n)] # nodes = [n for n in nodes if has_enough_ram(instance.ram_size, n)]
if not nodes: # if not nodes:
logger.warning('select_node: no enough RAM for %s', unicode(instance)) # logger.warning('select_node: no enough RAM for %s', unicode(instance))
raise NotEnoughMemoryException() # raise NotEnoughMemoryException()
#
# sort nodes first by processor usage, then priority # # sort nodes first by processor usage, then priority
nodes.sort(key=lambda n: n.priority, reverse=True) # nodes.sort(key=lambda n: n.priority, reverse=True)
nodes.sort(key=free_cpu_time, reverse=True) # nodes.sort(key=free_cpu_time, reverse=True)
result = nodes[0] # result = nodes[0]
#
logger.info('select_node: %s for %s', unicode(result), unicode(instance)) # logger.info('select_node: %s for %s', unicode(result), unicode(instance))
return result # return result
#
#
def has_traits(traits, node): # def has_traits(traits, node):
"""True, if the node has all specified traits; otherwise, false. # """True, if the node has all specified traits; otherwise, false.
""" # """
traits = set(traits) # traits = set(traits)
return traits.issubset(node.traits.all()) # return traits.issubset(node.traits.all())
#
#
def has_enough_ram(ram_size, node): # def has_enough_ram(ram_size, node):
"""True, if the node has enough memory to accomodate a guest requiring # """True, if the node has enough memory to accomodate a guest requiring
ram_size mebibytes of memory; otherwise, false. # ram_size mebibytes of memory; otherwise, false.
""" # """
ram_size = ram_size * 1024 * 1024 # ram_size = ram_size * 1024 * 1024
try: # try:
total = node.ram_size # total = node.ram_size
used = node.byte_ram_usage # used = node.byte_ram_usage
unused = total - used # unused = total - used
#
overcommit = node.ram_size_with_overcommit # overcommit = node.ram_size_with_overcommit
reserved = node.allocated_ram # reserved = node.allocated_ram
free = overcommit - reserved # free = overcommit - reserved
#
retval = ram_size < unused and ram_size < free # retval = ram_size < unused and ram_size < free
#
logger.debug('has_enough_ram(%d, %s)=%s (total=%s unused=%s' # logger.debug('has_enough_ram(%d, %s)=%s (total=%s unused=%s'
' overcommit=%s free=%s free_ok=%s overcommit_ok=%s)', # ' overcommit=%s free=%s free_ok=%s overcommit_ok=%s)',
ram_size, node, retval, total, unused, overcommit, free, # ram_size, node, retval, total, unused, overcommit, free,
ram_size < unused, ram_size < free) # ram_size < unused, ram_size < free)
return retval # return retval
except TypeError as e: # except TypeError as e:
logger.exception('Got incorrect monitoring data for node %s. %s', # logger.exception('Got incorrect monitoring data for node %s. %s',
unicode(node), unicode(e)) # unicode(node), unicode(e))
return False # return False
#
#
def free_cpu_time(node): # def free_cpu_time(node):
"""Get an indicator number for idle processor time on the node. # """Get an indicator number for idle processor time on the node.
#
Higher values indicate more idle time. # Higher values indicate more idle time.
""" # """
try: # try:
activity = node.cpu_usage / 100 # activity = node.cpu_usage / 100
inactivity = 1 - activity # inactivity = 1 - activity
cores = node.num_cores # cores = node.num_cores
return cores * inactivity # return cores * inactivity
except TypeError as e: # except TypeError as e:
logger.warning('Got incorrect monitoring data for node %s. %s', # logger.warning('Got incorrect monitoring data for node %s. %s',
unicode(node), unicode(e)) # unicode(node), unicode(e))
return False # monitoring data is incorrect # return False # monitoring data is incorrect
# Copyright 2014 Budapest University of Technology and Economics (BME IK) # # Copyright 2014 Budapest University of Technology and Economics (BME IK)
# #
# # This file is part of CIRCLE Cloud.
# #
# # CIRCLE is free software: you can redistribute it and/or modify it under
# # the terms of the GNU General Public License as published by the Free
# # Software Foundation, either version 3 of the License, or (at your option)
# # any later version.
# #
# # CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# # details.
# #
# # You should have received a copy of the GNU General Public License along
# # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
# #
# This file is part of CIRCLE Cloud. # from celery import Celery
# from celery.signals import worker_ready
# from datetime import timedelta
# from kombu import Queue, Exchange
# from os import getenv
# #
# CIRCLE is free software: you can redistribute it and/or modify it under # HOSTNAME = "localhost"
# the terms of the GNU General Public License as published by the Free # QUEUE_NAME = HOSTNAME + '.man.slow'
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
# #
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY # celery = Celery('manager.slow',
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # broker=getenv("AMQP_URI"),
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # include=['vm.tasks.local_tasks',
# details. # 'vm.tasks.local_periodic_tasks',
# 'storage.tasks.local_tasks',
# 'storage.tasks.periodic_tasks',
# ])
# #
# You should have received a copy of the GNU General Public License along # celery.conf.update(
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # CELERY_RESULT_BACKEND='amqp',
# CELERY_TASK_RESULT_EXPIRES=300,
from celery import Celery # CELERY_QUEUES=(
from celery.signals import worker_ready # Queue(QUEUE_NAME, Exchange('manager.slow', type='direct'),
from datetime import timedelta # routing_key="manager.slow"),
from kombu import Queue, Exchange # ),
from os import getenv # CELERYBEAT_SCHEDULE={
# 'vm.garbage_collector': {
HOSTNAME = "localhost" # 'task': 'vm.tasks.local_periodic_tasks.garbage_collector',
QUEUE_NAME = HOSTNAME + '.man.slow' # 'schedule': timedelta(minutes=10),
# 'options': {'queue': 'localhost.man.slow'}
celery = Celery('manager.slow', # },
broker=getenv("AMQP_URI"), # }
include=['vm.tasks.local_tasks', #
'vm.tasks.local_periodic_tasks', # )
'storage.tasks.local_tasks', #
'storage.tasks.periodic_tasks', #
]) # @worker_ready.connect()
# def cleanup_tasks(conf=None, **kwargs):
celery.conf.update( # '''Discard all task and clean up activity.'''
CELERY_RESULT_BACKEND='amqp', # from vm.models.activity import cleanup
CELERY_TASK_RESULT_EXPIRES=300, # cleanup(queue_name=QUEUE_NAME)
CELERY_QUEUES=(
Queue(QUEUE_NAME, Exchange('manager.slow', type='direct'),
routing_key="manager.slow"),
),
CELERYBEAT_SCHEDULE={
'vm.garbage_collector': {
'task': 'vm.tasks.local_periodic_tasks.garbage_collector',
'schedule': timedelta(minutes=10),
'options': {'queue': 'localhost.man.slow'}
},
}
)
@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
'''Discard all task and clean up activity.'''
from vm.models.activity import cleanup
cleanup(queue_name=QUEUE_NAME)
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from manager.mancelery import celery # from manager.mancelery import celery
from celery.contrib.abortable import AbortableTask from celery.contrib.abortable import AbortableTask
@celery.task #@celery.task
def check_queue(storage, queue_id, priority): def check_queue(storage, queue_id, priority):
''' Celery inspect job to check for active workers at queue_id ''' Celery inspect job to check for active workers at queue_id
return True/False return True/False
...@@ -38,34 +38,34 @@ def check_queue(storage, queue_id, priority): ...@@ -38,34 +38,34 @@ def check_queue(storage, queue_id, priority):
return queue_name in queue_names return queue_name in queue_names
@celery.task #@celery.task
def save_as(disk, timeout, user): def save_as(disk, timeout, user):
disk.save_disk_as(task_uuid=save_as.request.id, user=user, disk.save_disk_as(task_uuid=save_as.request.id, user=user,
disk=disk, timeout=timeout) disk=disk, timeout=timeout)
@celery.task #@celery.task
def clone(disk, new_disk, timeout, user): def clone(disk, new_disk, timeout, user):
disk.clone(task_uuid=save_as.request.id, user=user, disk.clone(task_uuid=save_as.request.id, user=user,
disk=new_disk, timeout=timeout) disk=new_disk, timeout=timeout)
@celery.task #@celery.task
def deploy(disk, user): def deploy(disk, user):
disk.deploy(task_uuid=deploy.request.id, user=user) disk.deploy(task_uuid=deploy.request.id, user=user)
@celery.task #@celery.task
def destroy(disk, user): def destroy(disk, user):
disk.destroy(task_uuid=destroy.request.id, user=user) disk.destroy(task_uuid=destroy.request.id, user=user)
@celery.task #@celery.task
def restore(disk, user): def restore(disk, user):
disk.restore(task_uuid=restore.request.id, user=user) disk.restore(task_uuid=restore.request.id, user=user)
@celery.task(base=AbortableTask, bind=True) #@celery.task(base=AbortableTask, bind=True)
def create_from_url(self, **kwargs): def create_from_url(self, **kwargs):
Disk = kwargs.pop('cls') Disk = kwargs.pop('cls')
Disk.create_from_url(url=kwargs.pop('url'), Disk.create_from_url(url=kwargs.pop('url'),
...@@ -74,7 +74,7 @@ def create_from_url(self, **kwargs): ...@@ -74,7 +74,7 @@ def create_from_url(self, **kwargs):
**kwargs) **kwargs)
@celery.task #@celery.task
def create_empty(Disk, instance, user, params): def create_empty(Disk, instance, user, params):
Disk.create_empty(instance, user, Disk.create_empty(instance, user,
task_uuid=create_empty.request.id, task_uuid=create_empty.request.id,
......
...@@ -15,74 +15,74 @@ ...@@ -15,74 +15,74 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from manager.mancelery import celery # from manager.mancelery import celery
@celery.task(name='storagedriver.list') #@celery.task(name='storagedriver.list')
def list(dir): def list(dir):
pass pass
@celery.task(name='storagedriver.list_files') #@celery.task(name='storagedriver.list_files')
def list_files(dir): def list_files(dir):
pass pass
@celery.task(name='storagedriver.create') #@celery.task(name='storagedriver.create')
def create(disk_desc): def create(disk_desc):
pass pass
@celery.task(name='storagedriver.download') #@celery.task(name='storagedriver.download')
def download(disk_desc, url): def download(disk_desc, url):
pass pass
@celery.task(name='storagedriver.delete') #@celery.task(name='storagedriver.delete')
def delete(path): def delete(path):
pass pass
@celery.task(name='storagedriver.delete_dump') #@celery.task(name='storagedriver.delete_dump')
def delete_dump(path): def delete_dump(path):
pass pass
@celery.task(name='storagedriver.snapshot') #@celery.task(name='storagedriver.snapshot')
def snapshot(disk_desc): def snapshot(disk_desc):
pass pass
@celery.task(name='storagedriver.get') #@celery.task(name='storagedriver.get')
def get(path): def get(path):
pass pass
@celery.task(name='storagedriver.merge') #@celery.task(name='storagedriver.merge')
def merge(src_disk_desc, dst_disk_desc): def merge(src_disk_desc, dst_disk_desc):
pass pass
@celery.task(name='storagedriver.make_free_space') #@celery.task(name='storagedriver.make_free_space')
def make_free_space(datastore, percent): def make_free_space(datastore, percent):
pass pass
@celery.task(name='storagedriver.move_to_trash') #@celery.task(name='storagedriver.move_to_trash')
def move_to_trash(datastore, disk_path): def move_to_trash(datastore, disk_path):
pass pass
@celery.task(name='storagedriver.recover_from_trash') #@celery.task(name='storagedriver.recover_from_trash')
def recover_from_trash(datastore, disk_path): def recover_from_trash(datastore, disk_path):
pass pass
@celery.task(name='storagedriver.get_storage_stat') #@celery.task(name='storagedriver.get_storage_stat')
def get_storage_stat(path): def get_storage_stat(path):
pass pass
@celery.task(name='storagedriver.get_file_statistics') #@celery.task(name='storagedriver.get_file_statistics')
def get_file_statistics(datastore): def get_file_statistics(datastore):
pass pass
...@@ -32,7 +32,7 @@ from common.models import ( ...@@ -32,7 +32,7 @@ from common.models import (
HumanReadableObject, HumanReadableException, HumanReadableObject, HumanReadableException,
) )
from manager.mancelery import celery # from manager.mancelery import celery
logger = getLogger(__name__) logger = getLogger(__name__)
......
...@@ -45,7 +45,7 @@ from common.models import ( ...@@ -45,7 +45,7 @@ from common.models import (
create_readable, humanize_exception, HumanReadableException create_readable, humanize_exception, HumanReadableException
) )
from common.operations import Operation, register_operation from common.operations import Operation, register_operation
from manager.scheduler import SchedulerError # from manager.scheduler import SchedulerError
from .tasks.local_tasks import ( from .tasks.local_tasks import (
abortable_async_instance_operation, abortable_async_node_operation, abortable_async_instance_operation, abortable_async_node_operation,
) )
......
from test import *
\ No newline at end of file
...@@ -15,79 +15,79 @@ ...@@ -15,79 +15,79 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from manager.mancelery import celery # from manager.mancelery import celery
@celery.task(name='agent.change_password') #@celery.task(name='agent.change_password')
def change_password(vm, password): def change_password(vm, password):
pass pass
@celery.task(name='agent.restart_networking') #@celery.task(name='agent.restart_networking')
def restart_networking(vm): def restart_networking(vm):
pass pass
@celery.task(name='agent.set_time') #@celery.task(name='agent.set_time')
def set_time(vm, time): def set_time(vm, time):
pass pass
@celery.task(name='agent.set_hostname') #@celery.task(name='agent.set_hostname')
def set_hostname(vm, time): def set_hostname(vm, time):
pass pass
@celery.task(name='agent.mount_store') #@celery.task(name='agent.mount_store')
def mount_store(vm, host, username, password): def mount_store(vm, host, username, password):
pass pass
@celery.task(name='agent.cleanup') #@celery.task(name='agent.cleanup')
def cleanup(vm): def cleanup(vm):
pass pass
@celery.task(name='agent.start_access_server') #@celery.task(name='agent.start_access_server')
def start_access_server(vm): def start_access_server(vm):
pass pass
@celery.task(name='agent.update_legacy') #@celery.task(name='agent.update_legacy')
def update_legacy(vm, data, executable=None): def update_legacy(vm, data, executable=None):
pass pass
@celery.task(name='agent.append') #@celery.task(name='agent.append')
def append(vm, data, filename, chunk_number): def append(vm, data, filename, chunk_number):
pass pass
@celery.task(name='agent.update') #@celery.task(name='agent.update')
def update(vm, filename, executable, checksum): def update(vm, filename, executable, checksum):
pass pass
@celery.task(name='agent.add_keys') #@celery.task(name='agent.add_keys')
def add_keys(vm, keys): def add_keys(vm, keys):
pass pass
@celery.task(name='agent.del_keys') #@celery.task(name='agent.del_keys')
def del_keys(vm, keys): def del_keys(vm, keys):
pass pass
@celery.task(name='agent.get_keys') #@celery.task(name='agent.get_keys')
def get_keys(vm): def get_keys(vm):
pass pass
@celery.task(name='agent.send_expiration') #@celery.task(name='agent.send_expiration')
def send_expiration(vm, url): def send_expiration(vm, url):
pass pass
@celery.task(name='agent.change_ip') #@celery.task(name='agent.change_ip')
def change_ip(vm, interfaces, dns): def change_ip(vm, interfaces, dns):
pass pass
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery.contrib.abortable import AbortableTask from celery.contrib.abortable import AbortableTask
from manager.mancelery import celery # from manager.mancelery import celery
@celery.task(base=AbortableTask, bind=True) #@celery.task(base=AbortableTask, bind=True)
def abortable_async_instance_operation(task, operation_id, instance_pk, def abortable_async_instance_operation(task, operation_id, instance_pk,
activity_pk, allargs, auxargs): activity_pk, allargs, auxargs):
from vm.models import Instance, InstanceActivity from vm.models import Instance, InstanceActivity
...@@ -37,7 +37,7 @@ def abortable_async_instance_operation(task, operation_id, instance_pk, ...@@ -37,7 +37,7 @@ def abortable_async_instance_operation(task, operation_id, instance_pk,
return operation._exec_op(allargs, auxargs) return operation._exec_op(allargs, auxargs)
@celery.task(base=AbortableTask, bind=True) #@celery.task(base=AbortableTask, bind=True)
def abortable_async_node_operation(task, operation_id, node_pk, activity_pk, def abortable_async_node_operation(task, operation_id, node_pk, activity_pk,
allargs, auxargs): allargs, auxargs):
from vm.models import Node, NodeActivity from vm.models import Node, NodeActivity
......
...@@ -15,14 +15,14 @@ ...@@ -15,14 +15,14 @@
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from manager.mancelery import celery # from manager.mancelery import celery
@celery.task(name='netdriver.create') #@celery.task(name='netdriver.create')
def create(params): def create(params):
pass pass
@celery.task(name='netdriver.delete') #@celery.task(name='netdriver.delete')
def destroy(params): def destroy(params):
pass pass
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
from django.core.cache import cache from django.core.cache import cache
from logging import getLogger from logging import getLogger
from manager.mancelery import celery # from manager.mancelery import celery
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -62,126 +62,126 @@ def get_queues(): ...@@ -62,126 +62,126 @@ def get_queues():
return result return result
@celery.task(name='vmdriver.attach_disk') #@celery.task(name='vmdriver.attach_disk')
def attach_disk(vm, disk): def attach_disk(vm, disk):
pass pass
@celery.task(name='vmdriver.detach_disk') #@celery.task(name='vmdriver.detach_disk')
def detach_disk(vm, disk): def detach_disk(vm, disk):
pass pass
@celery.task(name='vmdriver.attach_network') #@celery.task(name='vmdriver.attach_network')
def attach_network(vm, net): def attach_network(vm, net):
pass pass
@celery.task(name='vmdriver.detach_network') #@celery.task(name='vmdriver.detach_network')
def detach_network(vm, net): def detach_network(vm, net):
pass pass
@celery.task(name='vmdriver.create') #@celery.task(name='vmdriver.create')
def deploy(params): def deploy(params):
pass pass
@celery.task(name='vmdriver.delete') #@celery.task(name='vmdriver.delete')
def destroy(params): def destroy(params):
pass pass
@celery.task(name='vmdriver.save') #@celery.task(name='vmdriver.save')
def sleep(params): def sleep(params):
pass pass
@celery.task(name='vmdriver.restore') #@celery.task(name='vmdriver.restore')
def wake_up(params): def wake_up(params):
pass pass
@celery.task(name='vmdriver.suspend') #@celery.task(name='vmdriver.suspend')
def suspend(params): def suspend(params):
pass pass
@celery.task(name='vmdriver.resume') #@celery.task(name='vmdriver.resume')
def resume(params): def resume(params):
pass pass
@celery.task(name='vmdriver.shutdown') #@celery.task(name='vmdriver.shutdown')
def shutdown(params): def shutdown(params):
pass pass
@celery.task(name='vmdriver.reset') #@celery.task(name='vmdriver.reset')
def reset(params): def reset(params):
pass pass
@celery.task(name='vmdriver.reboot') #@celery.task(name='vmdriver.reboot')
def reboot(params): def reboot(params):
pass pass
@celery.task(name='vmdriver.migrate') #@celery.task(name='vmdriver.migrate')
def migrate(params): def migrate(params):
pass pass
@celery.task(name='vmdriver.resize_disk') #@celery.task(name='vmdriver.resize_disk')
def resize_disk(params): def resize_disk(params):
pass pass
@celery.task(name='vmdriver.domain_info') #@celery.task(name='vmdriver.domain_info')
def domain_info(params): def domain_info(params):
pass pass
@celery.task(name='vmdriver.list_domains') #@celery.task(name='vmdriver.list_domains')
def list_domains(params): def list_domains(params):
pass pass
@celery.task(name='vmdriver.list_domains_info') #@celery.task(name='vmdriver.list_domains_info')
def list_domains_info(params): def list_domains_info(params):
pass pass
@celery.task(name='vmdriver.ping') #@celery.task(name='vmdriver.ping')
def ping(params): def ping(params):
pass pass
@celery.task(name='vmdriver.get_core_num') #@celery.task(name='vmdriver.get_core_num')
def get_core_num(params): def get_core_num(params):
pass pass
@celery.task(name='vmdriver.get_architecture') #@celery.task(name='vmdriver.get_architecture')
def get_architecture(): def get_architecture():
pass pass
@celery.task(name='vmdriver.get_ram_size') #@celery.task(name='vmdriver.get_ram_size')
def get_ram_size(params): def get_ram_size(params):
pass pass
@celery.task(name='vmdriver.get_info') #@celery.task(name='vmdriver.get_info')
def get_info(params): def get_info(params):
pass pass
@celery.task(name='vmdriver.get_node_metrics') #@celery.task(name='vmdriver.get_node_metrics')
def get_node_metrics(params): def get_node_metrics(params):
pass pass
@celery.task(name='vmdriver.screenshot') #@celery.task(name='vmdriver.screenshot')
def screenshot(params): def screenshot(params):
pass pass
...@@ -26,6 +26,7 @@ Django==1.11.6 ...@@ -26,6 +26,7 @@ Django==1.11.6
django-appconf==1.0.2 django-appconf==1.0.2
django-autocomplete-light==3.2.9 django-autocomplete-light==3.2.9
django-braces==1.11.0 django-braces==1.11.0
django-celery==3.2.2
django-crispy-forms==1.6.1 django-crispy-forms==1.6.1
django-model-utils==3.0.0 django-model-utils==3.0.0
django-pipeline==1.6.13 django-pipeline==1.6.13
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment