Commit abcd5fd2 by Guba Sándor

manager: rework clean_up to match queue_name

parent 9ec3b364
...@@ -16,12 +16,15 @@ ...@@ -16,12 +16,15 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery from celery import Celery
from celery.signals import worker_ready
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
HOSTNAME = "localhost" HOSTNAME = "localhost"
CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/") CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/")
QUEUE_NAME = HOSTNAME + '.man'
celery = Celery('manager', celery = Celery('manager',
broker=getenv("AMQP_URI"), broker=getenv("AMQP_URI"),
...@@ -57,3 +60,10 @@ celery.conf.update( ...@@ -57,3 +60,10 @@ celery.conf.update(
} }
) )
@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
'''Discard all task and clean up activity.'''
from vm.models.activity import cleanup
cleanup(queue_name=QUEUE_NAME)
...@@ -16,12 +16,14 @@ ...@@ -16,12 +16,14 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery from celery import Celery
from celery.signals import worker_ready
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
HOSTNAME = "localhost" HOSTNAME = "localhost"
CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/") CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/")
QUEUE_NAME = HOSTNAME + '.monitor'
celery = Celery('monitor', celery = Celery('monitor',
broker=getenv("AMQP_URI"), broker=getenv("AMQP_URI"),
...@@ -34,7 +36,7 @@ celery.conf.update( ...@@ -34,7 +36,7 @@ celery.conf.update(
CELERY_CACHE_BACKEND=CACHE_URI, CELERY_CACHE_BACKEND=CACHE_URI,
CELERY_TASK_RESULT_EXPIRES=300, CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME + '.monitor', Exchange('monitor', type='direct'), Queue(QUEUE_NAME, Exchange('monitor', type='direct'),
routing_key="monitor"), routing_key="monitor"),
), ),
CELERYBEAT_SCHEDULE={ CELERYBEAT_SCHEDULE={
...@@ -70,3 +72,10 @@ celery.conf.update( ...@@ -70,3 +72,10 @@ celery.conf.update(
} }
) )
@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
'''Discard all task and clean up activity.'''
from vm.models.activity import cleanup
cleanup(queue_name=QUEUE_NAME)
...@@ -16,12 +16,14 @@ ...@@ -16,12 +16,14 @@
# with CIRCLE. If not, see <http://www.gnu.org/licenses/>. # with CIRCLE. If not, see <http://www.gnu.org/licenses/>.
from celery import Celery from celery import Celery
from celery.signals import worker_ready
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
HOSTNAME = "localhost" HOSTNAME = "localhost"
CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/") CACHE_URI = getenv("CACHE_URI", "pylibmc://127.0.0.1:11211/")
QUEUE_NAME = HOSTNAME + '.man.slow'
celery = Celery('manager.slow', celery = Celery('manager.slow',
broker=getenv("AMQP_URI"), broker=getenv("AMQP_URI"),
...@@ -36,7 +38,7 @@ celery.conf.update( ...@@ -36,7 +38,7 @@ celery.conf.update(
CELERY_CACHE_BACKEND=CACHE_URI, CELERY_CACHE_BACKEND=CACHE_URI,
CELERY_TASK_RESULT_EXPIRES=300, CELERY_TASK_RESULT_EXPIRES=300,
CELERY_QUEUES=( CELERY_QUEUES=(
Queue(HOSTNAME + '.man.slow', Exchange('manager.slow', type='direct'), Queue(QUEUE_NAME, Exchange('manager.slow', type='direct'),
routing_key="manager.slow"), routing_key="manager.slow"),
), ),
CELERYBEAT_SCHEDULE={ CELERYBEAT_SCHEDULE={
...@@ -48,3 +50,10 @@ celery.conf.update( ...@@ -48,3 +50,10 @@ celery.conf.update(
} }
) )
@worker_ready.connect()
def cleanup_tasks(conf=None, **kwargs):
'''Discard all task and clean up activity.'''
from vm.models.activity import cleanup
cleanup(queue_name=QUEUE_NAME)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment