Commit 437844f1 by Guba Sándor

storage: added queue checkig mechanism

parent dc4d0b54
......@@ -13,7 +13,7 @@ from sizefield.models import FileSizeField
from acl.models import AclBase
from .tasks import local_tasks, remote_tasks
from common.models import ActivityModel, activitycontextimpl
from common.models import ActivityModel, activitycontextimpl, WorkerNotFound
logger = logging.getLogger(__name__)
......@@ -36,7 +36,12 @@ class DataStore(Model):
return u'%s (%s)' % (self.name, self.path)
def get_remote_queue_name(self, queue_id):
return self.hostname + '.' + queue_id
logger.debug("Checking for storage queue %s.%s",
self.hostname, queue_id)
if local_tasks.check_queue(self.hostname, queue_id):
return self.hostname + '.' + queue_id
else:
raise WorkerNotFound()
class Disk(AclBase, TimeStampedModel):
......
......@@ -2,6 +2,25 @@ from manager.mancelery import celery
@celery.task
def check_queue(storage, queue_id):
''' Celery inspect job to check for active workers at queue_id
return True/False
'''
drivers = ['storage', 'download']
worker_list = [storage + "." + d for d in drivers]
queue_name = storage + "." + queue_id
# v is List of List of queues dict
active_queues = celery.control.inspect(worker_list).active_queues()
if active_queues is not None:
node_workers = [v for k, v in active_queues.iteritems()]
for worker in node_workers:
for queue in worker:
if queue['name'] == queue_name:
return True
return False
@celery.task
def deploy(disk, user):
disk.deploy(task_uuid=deploy.request.id, user=user)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment