Commit 3cafdeed by Bach Dániel

firewall: add exception handling to firewall tasks

parent 68b9fe68
...@@ -35,10 +35,12 @@ import django.conf ...@@ -35,10 +35,12 @@ import django.conf
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
import random import random
from common.models import HumanSortField from common.models import method_cache, WorkerNotFound, HumanSortField
from firewall.tasks.local_tasks import reloadtask from firewall.tasks.local_tasks import reloadtask
from .iptables import IptRule from .iptables import IptRule
from acl.models import AclBase from acl.models import AclBase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
settings = django.conf.settings.FIREWALL_SETTINGS settings = django.conf.settings.FIREWALL_SETTINGS
...@@ -802,6 +804,20 @@ class Firewall(models.Model): ...@@ -802,6 +804,20 @@ class Firewall(models.Model):
def __unicode__(self): def __unicode__(self):
return self.name return self.name
@method_cache(30)
def get_remote_queue_name(self, queue_id):
"""Returns the name of the remote celery queue for this node.
Throws Exception if there is no worker on the queue.
The result may include dead queues because of caching.
"""
from .tasks.remote_tasks import check_queue
if check_queue(self.name, queue_id, None):
return self.name + "." + queue_id
else:
raise WorkerNotFound()
class Domain(models.Model): class Domain(models.Model):
name = models.CharField(max_length=40, validators=[val_domain], name = models.CharField(max_length=40, validators=[val_domain],
......
...@@ -20,8 +20,10 @@ from socket import gethostname ...@@ -20,8 +20,10 @@ from socket import gethostname
import django.conf import django.conf
from django.core.cache import cache from django.core.cache import cache
from celery.exceptions import TimeoutError
from manager.mancelery import celery from manager.mancelery import celery
from common.models import WorkerNotFound
settings = django.conf.settings.FIREWALL_SETTINGS settings = django.conf.settings.FIREWALL_SETTINGS
logger = getLogger(__name__) logger = getLogger(__name__)
...@@ -36,19 +38,37 @@ def _apply_once(name, queues, task, data): ...@@ -36,19 +38,37 @@ def _apply_once(name, queues, task, data):
return return
cache.delete(lockname) cache.delete(lockname)
data = data()
for queue in queues: for queue in queues:
task.apply_async(args=data(), queue=queue) try:
logger.info("%s configuration is reloaded.", name) task.apply_async(args=data, queue=queue, expires=60).get(timeout=5)
logger.info("%s configuration is reloaded. (queue: %s)",
name, queue)
except TimeoutError as e:
logger.critical('%s (queue: %s)', e, queue)
except:
logger.critical('Unhandled exception: queue: %s data: %s',
queue, data, exc_info=True)
def get_firewall_queues():
from firewall.models import Firewall
retval = []
for fw in Firewall.objects.all():
try:
retval.append(fw.get_remote_queue_name('firewall'))
except WorkerNotFound:
logger.critical('Firewall %s is offline', fw.name)
return list(retval)
@celery.task(ignore_result=True) @celery.task(ignore_result=True)
def periodic_task(): def reloadtask_worker():
from firewall.fw import BuildFirewall, dhcp, dns, ipset, vlan from firewall.fw import BuildFirewall, dhcp, dns, ipset, vlan
from remote_tasks import (reload_dns, reload_dhcp, reload_firewall, from remote_tasks import (reload_dns, reload_dhcp, reload_firewall,
reload_firewall_vlan, reload_blacklist) reload_firewall_vlan, reload_blacklist)
firewall_queues = [("%s.firewall" % i) for i in firewall_queues = get_firewall_queues()
settings.get('firewall_queues', [gethostname()])]
dns_queues = [("%s.dns" % i) for i in dns_queues = [("%s.dns" % i) for i in
settings.get('dns_queues', [gethostname()])] settings.get('dns_queues', [gethostname()])]
...@@ -78,5 +98,5 @@ def reloadtask(type='Host', timeout=15): ...@@ -78,5 +98,5 @@ def reloadtask(type='Host', timeout=15):
}[type] }[type]
logger.info("Reload %s on next periodic iteration applying change to %s.", logger.info("Reload %s on next periodic iteration applying change to %s.",
", ".join(reload), type) ", ".join(reload), type)
for i in reload: if all(cache.add("%s_lock" % i, True, 30) for i in reload):
cache.add("%s_lock" % i, "true", 30) reloadtask_worker.apply_async(queue='localhost.man', countdown=5)
...@@ -18,6 +18,24 @@ ...@@ -18,6 +18,24 @@
from manager.mancelery import celery from manager.mancelery import celery
def check_queue(firewall, queue_id, priority):
''' Celery inspect job to check for active workers at queue_id
return True/False
'''
queue_name = firewall + "." + queue_id
if priority is not None:
queue_name = queue_name + "." + priority
inspect = celery.control.inspect()
inspect.timeout = 0.1
active_queues = inspect.active_queues()
if active_queues is None:
return False
queue_names = (queue['name'] for worker in active_queues.itervalues()
for queue in worker)
return queue_name in queue_names
@celery.task(name='firewall.reload_dns') @celery.task(name='firewall.reload_dns')
def reload_dns(data): def reload_dns(data):
pass pass
......
...@@ -45,11 +45,6 @@ celery.conf.update( ...@@ -45,11 +45,6 @@ celery.conf.update(
routing_key="monitor"), routing_key="monitor"),
), ),
CELERYBEAT_SCHEDULE={ CELERYBEAT_SCHEDULE={
'firewall.periodic_task': {
'task': 'firewall.tasks.local_tasks.periodic_task',
'schedule': timedelta(seconds=5),
'options': {'queue': 'localhost.man'}
},
'vm.update_domain_states': { 'vm.update_domain_states': {
'task': 'vm.tasks.local_periodic_tasks.update_domain_states', 'task': 'vm.tasks.local_periodic_tasks.update_domain_states',
'schedule': timedelta(seconds=10), 'schedule': timedelta(seconds=10),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment