Commit 6d9a0b97 by Guba Sándor

vm: rework cleanup to handle different queues

parent abcd5fd2
......@@ -20,7 +20,6 @@ from contextlib import contextmanager
from logging import getLogger
from warnings import warn
from celery.signals import worker_ready
from celery.contrib.abortable import AbortableAsyncResult
from django.core.urlresolvers import reverse
......@@ -278,17 +277,17 @@ def node_activity(code_suffix, node, task_uuid=None, user=None,
return activitycontextimpl(act)
@worker_ready.connect()
def cleanup(conf=None, **kwargs):
# TODO check if other manager workers are running
from celery.task.control import discard_all
discard_all()
msg_txt = ugettext_noop("Manager is restarted, activity is cleaned up. "
"You can try again now.")
message = create_readable(msg_txt, msg_txt)
queue_name = kwargs.get('queue_name', None)
for i in InstanceActivity.objects.filter(finished__isnull=True):
i.finish(False, result=message)
logger.error('Forced finishing stale activity %s', i)
op = i.get_operation()
if op and op.async_queue == queue_name:
i.finish(False, result=message)
logger.error('Forced finishing stale activity %s', i)
for i in NodeActivity.objects.filter(finished__isnull=True):
i.finish(False, result=message)
logger.error('Forced finishing stale activity %s', i)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment