Commit 7e1854d6 by Guba Sándor

Merge branch 'feature-mancelery-queues'

Conflicts:
	circle/vm/operations.py
parents 4268ac9d 682ab018
...@@ -101,7 +101,7 @@ def pull(dir="~/circle/circle"): ...@@ -101,7 +101,7 @@ def pull(dir="~/circle/circle"):
@roles('portal') @roles('portal')
def update_portal(test=False): def update_portal(test=False):
"Update and restart portal+manager" "Update and restart portal+manager"
with _stopped("portal", "mancelery"): with _stopped("portal", "manager"):
pull() pull()
pip("circle", "~/circle/requirements.txt") pip("circle", "~/circle/requirements.txt")
migrate() migrate()
...@@ -113,7 +113,7 @@ def update_portal(test=False): ...@@ -113,7 +113,7 @@ def update_portal(test=False):
@roles('portal') @roles('portal')
def stop_portal(test=False): def stop_portal(test=False):
"Stop portal and manager" "Stop portal and manager"
_stop_services("portal", "mancelery") _stop_services("portal", "manager")
@roles('node') @roles('node')
......
...@@ -31,7 +31,6 @@ celery = Celery('manager', ...@@ -31,7 +31,6 @@ celery = Celery('manager',
'storage.tasks.local_tasks', 'storage.tasks.local_tasks',
'storage.tasks.periodic_tasks', 'storage.tasks.periodic_tasks',
'firewall.tasks.local_tasks', 'firewall.tasks.local_tasks',
'monitor.tasks.local_periodic_tasks',
'dashboard.tasks.local_periodic_tasks', 'dashboard.tasks.local_periodic_tasks',
]) ])
...@@ -42,20 +41,8 @@ celery.conf.update( ...@@ -42,20 +41,8 @@ celery.conf.update(
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME + '.man', Exchange('manager', type='direct'), Queue(HOSTNAME + '.man', Exchange('manager', type='direct'),
routing_key="manager"), routing_key="manager"),
Queue(HOSTNAME + '.monitor', Exchange('monitor', type='direct'),
routing_key="monitor"),
), ),
CELERYBEAT_SCHEDULE={ CELERYBEAT_SCHEDULE={
'vm.update_domain_states': {
'task': 'vm.tasks.local_periodic_tasks.update_domain_states',
'schedule': timedelta(seconds=10),
'options': {'queue': 'localhost.man'}
},
'vm.garbage_collector': {
'task': 'vm.tasks.local_periodic_tasks.garbage_collector',
'schedule': timedelta(minutes=10),
'options': {'queue': 'localhost.man'}
},
'storage.periodic_tasks': { 'storage.periodic_tasks': {
'task': 'storage.tasks.periodic_tasks.garbage_collector', 'task': 'storage.tasks.periodic_tasks.garbage_collector',
'schedule': timedelta(hours=1), 'schedule': timedelta(hours=1),
...@@ -67,24 +54,6 @@ celery.conf.update( ...@@ -67,24 +54,6 @@ celery.conf.update(
'schedule': timedelta(hours=24), 'schedule': timedelta(hours=24),
'options': {'queue': 'localhost.man'} 'options': {'queue': 'localhost.man'}
}, },
'monitor.measure_response_time': {
'task': 'monitor.tasks.local_periodic_tasks.'
'measure_response_time',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.man'}
},
'monitor.check_celery_queues': {
'task': 'monitor.tasks.local_periodic_tasks.'
'check_celery_queues',
'schedule': timedelta(seconds=60),
'options': {'queue': 'localhost.man'}
},
'monitor.instance_per_template': {
'task': 'monitor.tasks.local_periodic_tasks.'
'instance_per_template',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.man'}
},
} }
) )
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery
from datetime import timedelta
from kombu import Queue, Exchange
from os import getenv
HOSTNAME = "localhost"
CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/")
celery = Celery('monitor',
broker=getenv("AMQP_URI"),
include=['vm.tasks.local_periodic_tasks',
'monitor.tasks.local_periodic_tasks',
])
celery.conf.update(
CELERY_RESULT_BACKEND='cache',
CELERY_CACHE_BACKEND=CACHE_URI,
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=(
Queue(HOSTNAME + '.monitor', Exchange('monitor', type='direct'),
routing_key="monitor"),
),
CELERYBEAT_SCHEDULE={
'vm.update_domain_states': {
'task': 'vm.tasks.local_periodic_tasks.update_domain_states',
'schedule': timedelta(seconds=10),
'options': {'queue': 'localhost.monitor'}
},
'monitor.measure_response_time': {
'task': 'monitor.tasks.local_periodic_tasks.'
'measure_response_time',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.monitor'}
},
'monitor.check_celery_queues': {
'task': 'monitor.tasks.local_periodic_tasks.'
'check_celery_queues',
'schedule': timedelta(seconds=60),
'options': {'queue': 'localhost.monitor'}
},
'monitor.instance_per_template': {
'task': 'monitor.tasks.local_periodic_tasks.'
'instance_per_template',
'schedule': timedelta(seconds=30),
'options': {'queue': 'localhost.monitor'}
},
}
)
# Copyright 2014 Budapest University of Technology and Economics (BME IK)
#
# This file is part of CIRCLE Cloud.
#
# CIRCLE is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# CIRCLE is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery
from datetime import timedelta
from kombu import Queue, Exchange
from os import getenv
HOSTNAME = "localhost"
CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/")
celery = Celery('manager.slow',
broker=getenv("AMQP_URI"),
include=['vm.tasks.local_tasks',
'vm.tasks.local_periodic_tasks',
'storage.tasks.local_tasks',
'storage.tasks.periodic_tasks',
])
celery.conf.update(
CELERY_RESULT_BACKEND='cache',
CELERY_CACHE_BACKEND=CACHE_URI,
CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=(
Queue(HOSTNAME + '.man.slow', Exchange('manager.slow', type='direct'),
routing_key="manager.slow"),
),
CELERYBEAT_SCHEDULE={
'vm.garbage_collector': {
'task': 'vm.tasks.local_periodic_tasks.garbage_collector',
'schedule': timedelta(minutes=10),
'options': {'queue': 'localhost.man.slow'}
},
}
)
...@@ -218,6 +218,7 @@ class DownloadDiskOperation(InstanceOperation): ...@@ -218,6 +218,7 @@ class DownloadDiskOperation(InstanceOperation):
has_percentage = True has_percentage = True
required_perms = ('storage.download_disk', ) required_perms = ('storage.download_disk', )
accept_states = ('STOPPED', 'PENDING', 'RUNNING') accept_states = ('STOPPED', 'PENDING', 'RUNNING')
async_queue = "localhost.man.slow"
def _operation(self, user, url, task, activity, name=None): def _operation(self, user, url, task, activity, name=None):
activity.result = url activity.result = url
...@@ -368,6 +369,7 @@ class MigrateOperation(InstanceOperation): ...@@ -368,6 +369,7 @@ class MigrateOperation(InstanceOperation):
required_perms = () required_perms = ()
superuser_required = True superuser_required = True
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
async_queue = "localhost.man.slow"
def rollback(self, activity): def rollback(self, activity):
with activity.sub_activity( with activity.sub_activity(
...@@ -512,6 +514,7 @@ class SaveAsTemplateOperation(InstanceOperation): ...@@ -512,6 +514,7 @@ class SaveAsTemplateOperation(InstanceOperation):
abortable = True abortable = True
required_perms = ('vm.create_template', ) required_perms = ('vm.create_template', )
accept_states = ('RUNNING', 'PENDING', 'STOPPED') accept_states = ('RUNNING', 'PENDING', 'STOPPED')
async_queue = "localhost.man.slow"
def is_preferred(self): def is_preferred(self):
return (self.instance.is_base and return (self.instance.is_base and
...@@ -667,6 +670,7 @@ class SleepOperation(InstanceOperation): ...@@ -667,6 +670,7 @@ class SleepOperation(InstanceOperation):
required_perms = () required_perms = ()
accept_states = ('RUNNING', ) accept_states = ('RUNNING', )
resultant_state = 'SUSPENDED' resultant_state = 'SUSPENDED'
async_queue = "localhost.man.slow"
def is_preferred(self): def is_preferred(self):
return (not self.instance.is_base and return (not self.instance.is_base and
...@@ -839,6 +843,7 @@ class FlushOperation(NodeOperation): ...@@ -839,6 +843,7 @@ class FlushOperation(NodeOperation):
description = _("Disable node and move all instances to other ones.") description = _("Disable node and move all instances to other ones.")
required_perms = () required_perms = ()
superuser_required = True superuser_required = True
async_queue = "localhost.man.slow"
def on_abort(self, activity, error): def on_abort(self, activity, error):
from manager.scheduler import TraitsUnsatisfiableException from manager.scheduler import TraitsUnsatisfiableException
......
description "CIRCLE manager"
start on runlevel [2345]
stop on runlevel [!2345]
pre-start script
start moncelery
start mancelery
start slowcelery
end script
post-stop script
stop moncelery
stop mancelery
stop slowcelery
end script
description "CIRCLE mancelery" description "CIRCLE mancelery for common jobs"
start on runlevel [2345]
stop on runlevel [!2345]
respawn respawn
respawn limit 30 30 respawn limit 30 30
setgid cloud setgid cloud
setuid cloud setuid cloud
...@@ -12,6 +10,6 @@ script ...@@ -12,6 +10,6 @@ script
cd /home/cloud/circle/circle cd /home/cloud/circle/circle
. /home/cloud/.virtualenvs/circle/bin/activate . /home/cloud/.virtualenvs/circle/bin/activate
. /home/cloud/.virtualenvs/circle/bin/postactivate . /home/cloud/.virtualenvs/circle/bin/postactivate
exec ./manage.py celery --app=manager.mancelery worker --autoreload --loglevel=info --hostname=mancelery -B -c 1 exec ./manage.py celery --app=manager.mancelery worker --autoreload --loglevel=info --hostname=mancelery -B -c 10
end script end script
description "CIRCLE moncelery for monitoring jobs"
respawn
respawn limit 30 30
setgid cloud
setuid cloud
script
cd /home/cloud/circle/circle
. /home/cloud/.virtualenvs/circle/bin/activate
. /home/cloud/.virtualenvs/circle/bin/postactivate
exec ./manage.py celery --app=manager.moncelery worker --autoreload --loglevel=info --hostname=moncelery -B -c 3
end script
description "CIRCLE mancelery for slow jobs"
respawn
respawn limit 30 30
setgid cloud
setuid cloud
script
cd /home/cloud/circle/circle
. /home/cloud/.virtualenvs/circle/bin/activate
. /home/cloud/.virtualenvs/circle/bin/postactivate
exec ./manage.py celery --app=manager.slowcelery worker --autoreload --loglevel=info --hostname=slowcelery -B -c 5
end script
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment