Commit 89c7ede1 by Bach Dániel

fw: refactor fw.py

parent 6b20ceba
from firewall import models
import django.conf
import re import re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from itertools import product
from .models import (Host, Rule, Vlan, Domain, Record, Blacklist, SwitchPort)
from .iptables import IptRule, IptChain
import django.conf
from django.db.models import Q from django.db.models import Q
from django.template import loader, Context
settings = django.conf.settings.FIREWALL_SETTINGS settings = django.conf.settings.FIREWALL_SETTINGS
class Firewall: class BuildFirewall:
def dportsport(self, rule, repl=True):
retval = ' '
if rule.proto == 'tcp' or rule.proto == 'udp':
retval = '-p %s ' % rule.proto
if rule.sport:
retval += ' --sport %s ' % rule.sport
if rule.dport:
retval += ' --dport %s ' % (rule.nat_dport
if (repl and rule.nat and rule.direction == '1')
else rule.dport)
elif rule.proto == 'icmp':
retval = '-p %s ' % rule.proto
return retval
def iptables(self, s):
"""Append rule to filter table."""
self.RULES.append(s)
def iptablesnat(self, s):
"""Append rule to NAT table."""
self.RULES_NAT.append(s)
def host2vlan(self, host, rule):
if not rule.foreign_network:
return
if self.proto == 6 and host.ipv6:
ipaddr = str(host.ipv6) + '/112'
else:
ipaddr = str(host.ipv4)
dport_sport = self.dportsport(rule)
for vlan in rule.foreign_network.vlans.all():
if rule.accept:
if rule.direction == '0' and vlan.name == 'PUB':
if rule.dport == 25:
self.iptables('-A PUB_OUT -s %s %s -p tcp '
'--dport 25 -j LOG_ACC' %
(ipaddr, rule.extra))
break
action = 'PUB_OUT'
else:
action = 'LOG_ACC'
else:
action = 'LOG_DROP'
if rule.direction == '1': # going TO host
self.iptables('-A %s_%s -d %s %s %s -g %s' %
(vlan.name, host.vlan.name, ipaddr, dport_sport,
rule.extra, action))
else:
self.iptables('-A %s_%s -s %s %s %s -g %s' %
(host.vlan.name, vlan.name, ipaddr, dport_sport,
rule.extra, action))
def fw2vlan(self, rule):
if not rule.foreign_network:
return
dport_sport = self.dportsport(rule)
for vlan in rule.foreign_network.vlans.all():
if rule.direction == '1': # going TO host
self.iptables('-A INPUT -i %s %s %s -g %s' %
(vlan.name, dport_sport, rule.extra,
'LOG_ACC' if rule.accept else 'LOG_DROP'))
else:
self.iptables('-A OUTPUT -o %s %s %s -g %s' %
(vlan.name, dport_sport, rule.extra,
'LOG_ACC' if rule.accept else 'LOG_DROP'))
def vlan2vlan(self, l_vlan, rule):
if not rule.foreign_network:
return
dport_sport = self.dportsport(rule) def __init__(self):
self.chains = {}
for vlan in rule.foreign_network.vlans.all(): def add_rules(self, *args, **kwargs):
if rule.accept: for chain_name, ipt_rule in kwargs.items():
if rule.direction == '0' and vlan.name == 'PUB': if chain_name not in self.chains:
action = 'PUB_OUT' self.create_chain(chain_name)
else: self.chains[chain_name].add(ipt_rule)
action = 'LOG_ACC'
else:
action = 'LOG_DROP'
if rule.direction == '1': # going TO host def create_chain(self, chain_name):
self.iptables('-A %s_%s %s %s -g %s' % self.chains[chain_name] = IptChain(name=chain_name)
(vlan.name, l_vlan.name, dport_sport,
rule.extra, action))
else:
self.iptables('-A %s_%s %s %s -g %s' % (l_vlan.name, vlan.name,
dport_sport,
rule.extra, action))
def prerun(self):
self.iptables('*filter')
self.iptables(':INPUT DROP [88:6448]')
self.iptables(':FORWARD DROP [0:0]')
self.iptables(':OUTPUT DROP [50:6936]')
# initialize logging
self.iptables('-N LOG_DROP')
# windows port scan are silently dropped
self.iptables('-A LOG_DROP -p tcp --dport 445 -j DROP')
self.iptables('-A LOG_DROP -p udp --dport 137 -j DROP')
self.iptables('-A LOG_DROP -j LOG --log-level 7 '
'--log-prefix "[ipt][drop]"')
self.iptables('-A LOG_DROP -j DROP')
self.iptables('-N LOG_ACC')
self.iptables('-A LOG_ACC -j LOG --log-level 7 '
'--log-prefix "[ipt][isok]"')
self.iptables('-A LOG_ACC -j ACCEPT')
self.iptables('-N PUB_OUT')
self.iptables('-A FORWARD -m set --match-set blacklist src,dst '
'-j DROP')
self.iptables('-A FORWARD -m state --state INVALID -g LOG_DROP')
self.iptables('-A FORWARD -m state --state ESTABLISHED,RELATED '
'-j ACCEPT')
self.iptables('-A FORWARD -p icmp --icmp-type echo-request '
'-g LOG_ACC')
self.iptables('-A INPUT -m set --match-set blacklist src -j DROP')
self.iptables('-A INPUT -m state --state INVALID -g LOG_DROP')
self.iptables('-A INPUT -i lo -j ACCEPT')
self.iptables('-A INPUT -m state --state ESTABLISHED,RELATED '
'-j ACCEPT')
self.iptables('-A OUTPUT -m state --state INVALID -g LOG_DROP')
self.iptables('-A OUTPUT -o lo -j ACCEPT')
self.iptables('-A OUTPUT -m state --state ESTABLISHED,RELATED '
'-j ACCEPT')
def postrun(self):
self.iptables('-A PUB_OUT -p tcp --dport 25 -j LOG_DROP')
self.iptables('-A PUB_OUT -p tcp --dport 445 -j LOG_DROP')
self.iptables('-A PUB_OUT -p udp --dport 445 -j LOG_DROP')
self.iptables('-A PUB_OUT -g LOG_ACC')
self.iptables('-A FORWARD -g LOG_DROP')
self.iptables('-A INPUT -g LOG_DROP')
self.iptables('-A OUTPUT -g LOG_DROP')
self.iptables('COMMIT')
def ipt_nat(self):
self.iptablesnat('*nat')
self.iptablesnat(':PREROUTING ACCEPT [0:0]')
self.iptablesnat(':INPUT ACCEPT [0:0]')
self.iptablesnat(':OUTPUT ACCEPT [1:708]')
self.iptablesnat(':POSTROUTING ACCEPT [1:708]')
def build_ipt_nat(self):
# portforward # portforward
for host in self.hosts.exclude(pub_ipv4=None): for rule in Rule.objects.filter(
for rule in host.rules.filter(nat=True, direction='1'): nat=True, direction='in').select_related('host'):
dport_sport = self.dportsport(rule, False) self.add_rules(PREROUTING=IptRule(
if host.vlan.snat_ip: priority=1000,
self.iptablesnat('-A PREROUTING -d %s %s %s -j DNAT ' dst=(rule.get_external_ipv4(), None),
'--to-destination %s:%s' % proto=rule.proto,
(host.pub_ipv4, dport_sport, rule.extra, dport=rule.get_external_port('ipv4'),
host.ipv4, rule.nat_dport)) extra='-j DNAT --to-destination %s:%s' % (rule.host.ipv4,
rule.dport)))
# rules for machines with dedicated public IP
for host in self.hosts.exclude(shared_ip=True): # default outbound NAT rules for VLANs
if host.pub_ipv4: for vl_in in Vlan.objects.exclude(
self.iptablesnat('-A PREROUTING -d %s -j DNAT ' snat_ip=None).prefetch_related('snat_to'):
'--to-destination %s' % for vl_out in vl_in.snat_to.all():
(host.pub_ipv4, host.ipv4)) self.add_rules(POSTROUTING=IptRule(
self.iptablesnat('-A POSTROUTING -s %s -j SNAT ' priority=1000,
'--to-source %s' % src=(vl_in.network4, None),
(host.ipv4, host.pub_ipv4)) extra='-o %s -j SNAT --to-source %s' % (
vl_out.name, vl_in.snat_ip)))
# default NAT rules for VLANs
for s_vlan in self.vlans:
if s_vlan.snat_ip:
for d_vlan in s_vlan.snat_to.all():
self.iptablesnat('-A POSTROUTING -s %s -o %s -j SNAT '
'--to-source %s' %
(str(s_vlan.network4), d_vlan.name,
s_vlan.snat_ip))
self.iptablesnat('COMMIT')
def ipt_filter(self):
self.prerun()
self.ipt_filter_firewall()
self.ipt_filter_zones()
self.ipt_filter_host_rules()
self.ipt_filter_vlan_rules()
self.ipt_filter_vlan_drop()
self.postrun()
if self.proto == 6: # remove ipv4-specific rules
ipv4_re = re.compile('([0-9]{1,3}\.){3}[0-9]{1,3}')
self.RULES = [x for x in self.RULES if not ipv4_re.search(x)]
self.RULES = [x.replace('icmp', 'icmpv6') for x in self.RULES]
def ipt_filter_firewall(self): def ipt_filter_firewall(self):
"""Build firewall's own rules.""" """Build firewall's own rules."""
for f in self.fw: for rule in Rule.objects.exclude(firewall=None).select_related(
for rule in f.rules.all(): 'foreign_network').prefetch_related('foreign_network__vlans'):
self.fw2vlan(rule) self.add_rules(**rule.get_ipt_rules())
def ipt_filter_zones(self):
"""Jumping to chains between zones."""
for s_vlan in self.vlans:
for d_vlan in self.vlans:
self.iptables('-N %s_%s' % (s_vlan.name, d_vlan.name))
self.iptables('-A FORWARD -i %s -o %s -g %s_%s' %
(s_vlan.name, d_vlan.name, s_vlan.name,
d_vlan.name))
def ipt_filter_host_rules(self): def ipt_filter_host_rules(self):
"""Build hosts' rules.""" """Build hosts' rules."""
for i_vlan in self.vlans: # host rules
for i_host in i_vlan.host_set.all(): for rule in Rule.objects.exclude(host=None).select_related(
for group in i_host.groups.all(): 'foreign_network', 'host',
for rule in group.rules.all(): 'host__vlan').prefetch_related('foreign_network__vlans'):
self.host2vlan(i_host, rule) self.add_rules(**rule.get_ipt_rules(rule.host))
for rule in i_host.rules.all(): # group rules
self.host2vlan(i_host, rule) for rule in Rule.objects.exclude(hostgroup=None).select_related(
'hostgroup', 'foreign_network').prefetch_related(
'hostgroup__host_set__vlan', 'foreign_network__vlans'):
for host in rule.hostgroup.host_set.all():
self.add_rules(**rule.get_ipt_rules(host))
def ipt_filter_vlan_rules(self): def ipt_filter_vlan_rules(self):
"""Enable communication between VLANs.""" """Enable communication between VLANs."""
for s_vlan in self.vlans: for rule in Rule.objects.exclude(vlan=None).select_related(
for rule in s_vlan.rules.all(): 'vlan', 'foreign_network').prefetch_related(
self.vlan2vlan(s_vlan, rule) 'foreign_network__vlans'):
self.add_rules(**rule.get_ipt_rules())
def ipt_filter_vlan_drop(self): def ipt_filter_vlan_drop(self):
"""Close intra-VLAN chains.""" """Close intra-VLAN chains."""
for s_vlan in self.vlans: for chain in self.chains.values():
for d_vlan in self.vlans: close_chain_rule = IptRule(priority=65534, action='LOG_DROP')
self.iptables('-A %s_%s -g LOG_DROP' % (s_vlan.name, chain.add(close_chain_rule)
d_vlan.name))
def __init__(self, proto=4):
self.RULES = []
self.RULES_NAT = []
self.proto = proto
self.vlans = models.Vlan.objects.all()
self.hosts = models.Host.objects.all()
self.fw = models.Firewall.objects.all()
self.ipt_filter()
if self.proto != 6:
self.ipt_nat()
def get(self):
if self.proto == 6:
return {'filter': self.RULES, }
else:
return {'filter': self.RULES, 'nat': self.RULES_NAT}
def show(self): def ipt_filter_vlan_jump(self):
if self.proto == 6: """Create intra-VLAN jump rules."""
return '\n'.join(self.RULES) + '\n'
vlans = Vlan.objects.all().values_list('name', flat=True)
for vl_in, vl_out in product(vlans, repeat=2):
name = '%s_%s' % (vl_in, vl_out)
try:
chain = self.chains[name]
except KeyError:
pass
else: else:
return ('\n'.join(self.RULES) + '\n' + jump_rule = IptRule(priority=1, action=chain.name,
'\n'.join(self.RULES_NAT) + '\n') extra='-i %s -o %s' % (vl_in, vl_out))
self.add_rules(FORWARD=jump_rule)
def build_ipt(self):
"""Build rules."""
# TODO remove ipv4-specific rules
self.ipt_filter_firewall()
self.ipt_filter_host_rules()
self.ipt_filter_vlan_rules()
self.ipt_filter_vlan_jump()
self.ipt_filter_vlan_drop()
self.build_ipt_nat()
context = {
'filter': (chain for name, chain in self.chains.iteritems()
if chain.name not in ('PREROUTING', 'POSTROUTING')),
'nat': (chain for name, chain in self.chains.iteritems()
if chain.name in ('PREROUTING', 'POSTROUTING'))}
template = loader.get_template('firewall/iptables.conf')
context['proto'] = 'ipv4'
ipv4 = unicode(template.render(Context(context)))
context['proto'] = 'ipv6'
ipv6 = unicode(template.render(Context(context)))
return (ipv4, ipv6)
def ipset(): def ipset():
week = datetime.now() - timedelta(days=2) week = datetime.now() - timedelta(days=2)
filter_ban = (Q(type='tempban', modified_at__gte=week) | filter_ban = (Q(type='tempban', modified_at__gte=week) |
Q(type='permban')).values('ipv4', 'reason') Q(type='permban')).values('ipv4', 'reason')
return models.Blacklist.objects.filter(filter_ban) return Blacklist.objects.filter(filter_ban)
def ipv6_to_octal(ipv6): def ipv6_to_octal(ipv6):
...@@ -314,25 +160,21 @@ def ipv6_to_octal(ipv6): ...@@ -314,25 +160,21 @@ def ipv6_to_octal(ipv6):
def generate_ptr_records(): def generate_ptr_records():
DNS = [] DNS = []
for host in models.Host.objects.order_by('vlan').all(): for host in Host.objects.order_by('vlan').all():
rev = host.vlan.reverse_domain template = host.vlan.reverse_domain
ipv4 = str(host.pub_ipv4 if host.pub_ipv4 and i = host.get_external_ipv4().words
not host.shared_ip else host.ipv4) reverse = (host.reverse if host.reverse not in [None, '']
i = ipv4.split('.', 4) else host.get_fqdn())
reverse = (host.reverse if host.reverse and
len(host.reverse) else host.get_fqdn())
# ipv4 # ipv4
if host.ipv4: if host.ipv4:
DNS.append("^%s:%s:%s" % ( fqdn = template % {'a': i[0], 'b': i[1], 'c': i[2], 'd': i[3]}
(rev % {'a': int(i[0]), 'b': int(i[1]), 'c': int(i[2]), DNS.append("^%s:%s:%s" % (fqdn, reverse, settings['dns_ttl']))
'd': int(i[3])}),
reverse, models.settings['dns_ttl']))
# ipv6 # ipv6
if host.ipv6: if host.ipv6:
DNS.append("^%s:%s:%s" % (host.ipv6.reverse_dns, DNS.append("^%s:%s:%s" % (host.ipv6.reverse_dns,
reverse, models.settings['dns_ttl'])) reverse, settings['dns_ttl']))
return DNS return DNS
...@@ -341,30 +183,27 @@ def txt_to_octal(txt): ...@@ -341,30 +183,27 @@ def txt_to_octal(txt):
def generate_records(): def generate_records():
DNS = [] types = {'A': '+%(fqdn)s:%(address)s:%(ttl)s',
'AAAA': ':%(fqdn)s:28:%(octal)s:%(ttl)s',
for r in models.Record.objects.all(): 'NS': '&%(fqdn)s::%(address)s:%(ttl)s',
if r.type == 'A': 'CNAME': 'C%(fqdn)s:%(address)s:%(ttl)s',
DNS.append("+%s:%s:%s" % (r.fqdn, r.address, r.ttl)) 'MX': '@%(fqdn)s::%(address)s:%(dist)s:%(ttl)s',
elif r.type == 'AAAA': 'PTR': '^%(fqdn)s:%(address)s:%(ttl)s',
DNS.append(":%s:28:%s:%s" % 'TXT': '%(fqdn)s:%(octal)s:%(ttl)s'}
(r.fqdn, ipv6_to_octal(r.address), r.ttl))
elif r.type == 'NS': retval = []
DNS.append("&%s::%s:%s" % (r.fqdn, r.address, r.ttl))
elif r.type == 'CNAME': for r in Record.objects.all():
DNS.append("C%s:%s:%s" % (r.fqdn, r.address, r.ttl)) params = {'fqdn': r.fqdn, 'address': r.address, 'ttl': r.ttl}
elif r.type == 'MX': if r.type == 'MX':
mx = r.address.split(':', 2) params['address'], params['dist'] = r.address.split(':', 2)
DNS.append("@%(fqdn)s::%(mx)s:%(dist)s:%(ttl)s" % if r.type == 'AAAA':
{'fqdn': r.fqdn, 'mx': mx[1], 'dist': mx[0], params['octal'] = ipv6_to_octal(r.address)
'ttl': r.ttl}) if r.type == 'TXT':
elif r.type == 'PTR': params['octal'] = txt_to_octal(r.address)
DNS.append("^%s:%s:%s" % (r.fqdn, r.address, r.ttl)) retval.append(types[r.type] % params)
elif r.type == 'TXT':
DNS.append("'%s:%s:%s" % (r.fqdn,
txt_to_octal(r.address), r.ttl))
return DNS return retval
def dns(): def dns():
...@@ -374,10 +213,10 @@ def dns(): ...@@ -374,10 +213,10 @@ def dns():
DNS += generate_ptr_records() DNS += generate_ptr_records()
# domain SOA record # domain SOA record
for domain in models.Domain.objects.all(): for domain in Domain.objects.all():
DNS.append("Z%s:%s:support.ik.bme.hu::::::%s" % DNS.append("Z%s:%s:support.ik.bme.hu::::::%s" %
(domain.name, settings['dns_hostname'], (domain.name, settings['dns_hostname'],
models.settings['dns_ttl'])) settings['dns_ttl']))
# records # records
DNS += generate_records() DNS += generate_records()
...@@ -392,7 +231,7 @@ def dhcp(): ...@@ -392,7 +231,7 @@ def dhcp():
# /tools/dhcp3/dhcpd.conf.generated # /tools/dhcp3/dhcpd.conf.generated
for i_vlan in models.Vlan.objects.all(): for i_vlan in Vlan.objects.all():
if(i_vlan.dhcp_pool): if(i_vlan.dhcp_pool):
m = regex.search(i_vlan.dhcp_pool) m = regex.search(i_vlan.dhcp_pool)
if(m or i_vlan.dhcp_pool == "manual"): if(m or i_vlan.dhcp_pool == "manual"):
...@@ -437,14 +276,14 @@ def dhcp(): ...@@ -437,14 +276,14 @@ def dhcp():
def vlan(): def vlan():
obj = models.Vlan.objects.values('vid', 'name', 'network4', 'network6') obj = Vlan.objects.values('vid', 'name', 'network4', 'network6')
retval = {x['name']: {'tag': x['vid'], retval = {x['name']: {'tag': x['vid'],
'type': 'internal', 'type': 'internal',
'interfaces': [x['name']], 'interfaces': [x['name']],
'addresses': [str(x['network4']), 'addresses': [str(x['network4']),
str(x['network6'])]} str(x['network6'])]}
for x in obj} for x in obj}
for p in models.SwitchPort.objects.all(): for p in SwitchPort.objects.all():
eth_count = p.ethernet_devices.count() eth_count = p.ethernet_devices.count()
if eth_count > 1: if eth_count > 1:
name = 'bond%d' % p.id name = 'bond%d' % p.id
......
import logging
import re
from collections import OrderedDict
logger = logging.getLogger()
ipv4_re = re.compile(
r'^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}')
class InvalidRuleExcepion(Exception):
pass
class IptRule(object):
def __init__(self, priority=1000, action=None, src=None, dst=None,
proto=None, sport=None, dport=None, extra=None):
if proto not in ['tcp', 'udp', 'icmp', None]:
raise InvalidRuleExcepion()
if proto not in ['tcp', 'udp'] and (sport is not None or
dport is not None):
raise InvalidRuleExcepion()
self.priority = int(priority)
self.action = action
(self.src4, self.src6) = (None, None)
if isinstance(src, tuple):
(self.src4, self.src6) = src
(self.dst4, self.dst6) = (None, None)
if isinstance(dst, tuple):
(self.dst4, self.dst6) = dst
self.proto = proto
self.sport = sport
self.dport = dport
self.extra = extra
self.ipv4_only = extra and bool(ipv4_re.search(extra))
def __hash__(self):
return hash(frozenset(self.__dict__.items()))
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __lt__(self, other):
return self.priority < other.priority
def __repr__(self):
return '<IptRule: @%d %s >' % (self.priority, self.compile())
def __unicode__(self):
return self.__repr__()
def compile(self, proto='ipv4'):
opts = OrderedDict([('src4' if proto == 'ipv4' else 'src6', '-s %s'),
('dst4' if proto == 'ipv4' else 'dst6', '-d %s'),
('proto', '-p %s'),
('sport', '--sport %s'),
('dport', '--dport %s'),
('extra', '%s'),
('action', '-g %s')])
params = [opts[param] % getattr(self, param)
for param in opts
if getattr(self, param) is not None]
return ' '.join(params)
class IptChain(object):
builtin_chains = ('FORWARD', 'INPUT', 'OUTPUT', 'PREROUTING',
'POSTROUTING')
def __init__(self, name):
self.rules = set()
self.name = name
def add(self, *args, **kwargs):
for rule in args:
self.rules.add(rule)
def sort(self):
return sorted(list(self.rules))
def __len__(self):
return len(self.rules)
def __repr__(self):
return '<IptChain: %s %s>' % (self.name, self.rules)
def __unicode__(self):
return self.__repr__()
def compile(self, proto='ipv4'):
assert proto in ('ipv4', 'ipv6')
prefix = '-A %s ' % self.name
return '\n'.join([prefix + rule.compile(proto)
for rule in self.sort()
if not (proto == 'ipv6' and rule.ipv4_only)])
{% if nat %}
*nat
:PREROUTING ACCEPT [0:0]
:INPUT ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
:POSTROUTING ACCEPT [0:0]
{% for chain in nat %}
{{ chain.compile }}
{% endfor %}
COMMIT
{% endif %}
*filter
:INPUT DROP [0:0]
:FORWARD DROP [0:0]
:OUTPUT DROP [0:0]
# initialize logging
-N LOG_DROP
# windows port scan are silently dropped
-A LOG_DROP -p tcp --dport 445 -j DROP
-A LOG_DROP -p udp --dport 137 -j DROP
-A LOG_DROP -j LOG --log-level 7 --log-prefix "[ipt][drop]"
-A LOG_DROP -j DROP
-N LOG_ACC
-A LOG_ACC -j LOG --log-level 7 --log-prefix "[ipt][isok]"
-A LOG_ACC -j ACCEPT
# initialize FORWARD chain
-A FORWARD -m set --match-set blacklist src,dst -j DROP
-A FORWARD -m state --state INVALID -g LOG_DROP
-A FORWARD -m state --state ESTABLISHED,RELATED -j ACCEPT
-A FORWARD -p icmp --icmp-type echo-request -g LOG_ACC
# initialize INPUT chain
-A INPUT -m set --match-set blacklist src -j DROP
-A INPUT -m state --state INVALID -g LOG_DROP
-A INPUT -i lo -j ACCEPT
-A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
# initialize OUTPUT chain
-A OUTPUT -m state --state INVALID -g LOG_DROP
-A OUTPUT -o lo -j ACCEPT
-A OUTPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
{% for chain in filter %}
{% if chain.name not in chain.builtin_chains %}-N {{ chain.name }}{% endif %}
{{ chain.compile }}
{% endfor %}
# close all chains
-A FORWARD -g LOG_DROP
-A INPUT -g LOG_DROP
-A OUTPUT -g LOG_DROP
COMMIT
...@@ -5,6 +5,7 @@ from django.contrib.auth.models import User ...@@ -5,6 +5,7 @@ from django.contrib.auth.models import User
from ..admin import HostAdmin from ..admin import HostAdmin
from firewall.models import Vlan, Domain, Record, Host from firewall.models import Vlan, Domain, Record, Host
from django.forms import ValidationError from django.forms import ValidationError
from ..iptables import IptRule, IptChain, InvalidRuleExcepion
class MockInstance: class MockInstance:
...@@ -109,3 +110,50 @@ class HostGetHostnameTestCase(TestCase): ...@@ -109,3 +110,50 @@ class HostGetHostnameTestCase(TestCase):
self.r.save() self.r.save()
self.assertEqual(self.h.get_hostname(proto='ipv4', public=True), self.assertEqual(self.h.get_hostname(proto='ipv4', public=True),
self.r.fqdn) self.r.fqdn)
class IptablesTestCase(TestCase):
def setUp(self):
self.r = [IptRule(priority=4, action='ACCEPT',
src=('127.0.0.4', None)),
IptRule(priority=4, action='ACCEPT',
src=('127.0.0.4', None)),
IptRule(priority=2, action='ACCEPT',
dst=('127.0.0.2', None),
extra='-p icmp'),
IptRule(priority=6, action='ACCEPT',
dst=('127.0.0.6', None),
proto='tcp', dport=80),
IptRule(priority=1, action='ACCEPT',
dst=('127.0.0.1', None),
proto='udp', dport=53),
IptRule(priority=5, action='ACCEPT',
dst=('127.0.0.5', None),
proto='tcp', dport=443),
IptRule(priority=2, action='ACCEPT',
dst=('127.0.0.2', None),
proto='icmp'),
IptRule(priority=6, action='ACCEPT',
dst=('127.0.0.6', None),
proto='tcp', dport='1337')]
def test_chain_add(self):
ch = IptChain(name='test')
ch.add(*self.r)
self.assertEqual(len(ch), len(self.r) - 1)
def test_rule_compile_ok(self):
self.assertEqual(self.r[5].compile(),
'-d 127.0.0.5 -p tcp --dport 443 -g ACCEPT')
def test_rule_compile_fail(self):
self.assertRaises(InvalidRuleExcepion,
IptRule, **{'priority': 5, 'action': 'ACCEPT',
'dst': '127.0.0.5',
'proto': 'icmp', 'dport': 443})
def test_chain_compile(self):
ch = IptChain(name='test')
ch.add(*self.r)
compiled = ch.compile()
self.assertEqual(len(compiled.splitlines()), len(ch))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment