Commit a14ce987 by Őry Máté

copy new modules from circle2

parent d40a8aad
from os import environ
from os.path import abspath, basename, dirname, join, normpath
from json import loads
from socket import SOCK_STREAM
from sys import path
# Apps specific for this project go here.
# See:
WSGI_APPLICATION = '%s.wsgi.application' % SITE_NAME
# -*- coding: utf8 -*-
from django.contrib import admin
from firewall.models import *
from django import contrib
class RuleInline(contrib.admin.TabularInline):
model = Rule
class RecordInline(contrib.admin.TabularInline):
model = Record
class HostAdmin(admin.ModelAdmin):
list_display = ('hostname', 'vlan', 'ipv4', 'ipv6', 'pub_ipv4', 'mac',
'shared_ip', 'owner', 'description', 'reverse', 'list_groups')
ordering = ('hostname', )
list_filter = ('owner', 'vlan', 'groups')
search_fields = ('hostname', 'description', 'ipv4', 'ipv6', 'mac')
filter_horizontal = ('groups', )
inlines = (RuleInline, RecordInline)
def list_groups(instance):
"""Returns instance's groups' names as a comma-separated list."""
names = [ for group in instance.groups.all()]
return u', '.join(names)
class HostInline(contrib.admin.TabularInline):
model = Host
fields = ('hostname', 'ipv4', 'ipv6', 'pub_ipv4', 'mac', 'shared_ip',
'owner', 'reverse')
class VlanAdmin(admin.ModelAdmin):
list_display = ('vid', 'name', 'ipv4', 'net_ipv4', 'ipv6', 'net_ipv6',
'description', 'domain', 'snat_ip', )
ordering = ('vid', )
inlines = (RuleInline, )
class RuleAdmin(admin.ModelAdmin):
list_display = ('r_type', 'color_desc', 'owner', 'extra', 'direction',
'accept', 'proto', 'sport', 'dport', 'nat', 'nat_dport', 'used_in')
list_filter = ('r_type', 'vlan', 'owner', 'direction', 'accept',
'proto', 'nat')
def color_desc(self, instance):
"""Returns a colorful description of the instance."""
return (u'<span style="color: #FF0000;">[%(type)s]</span> '
u'%(src)s<span style="color: #0000FF;"> ▸ </span>%(dst)s '
u'%(para)s %(desc)s') % {
'type': instance.r_type,
'src': (
if instance.direction == '1' else instance.r_type),
'dst': (instance.r_type if instance.direction == '1'
'para': (u'<span style="color: #00FF00;">' +
(('proto=%s ' % instance.proto)
if instance.proto else '') +
(('sport=%s ' %
if else '') +
(('dport=%s ' % instance.dport)
if instance.dport else '') +
'desc': instance.description}
color_desc.allow_tags = True
def vlan_l(instance):
"""Returns instance's VLANs' names as a comma-separated list."""
names = [ for vlan in instance.foreign_network.vlans.all()]
return u', '.join(names)
def used_in(instance):
for field in [instance.vlan, instance.vlangroup,,
instance.hostgroup, instance.firewall]:
if field:
return unicode(field) + ' ' + field._meta.object_name
class AliasAdmin(admin.ModelAdmin):
list_display = ('alias', 'host')
class GroupAdmin(admin.ModelAdmin):
list_display = ('name', 'owner', 'description')
inlines = (RuleInline, )
class FirewallAdmin(admin.ModelAdmin):
inlines = (RuleInline, )
class DomainAdmin(admin.ModelAdmin):
list_display = ('name', 'owner')
class RecordAdmin(admin.ModelAdmin):
list_display = ('name_', 'type', 'address_', 'ttl', 'host', 'owner')
def address_(instance):
a = instance.get_data()
return a['address'] if a else None
def name_(instance):
a = instance.get_data()
return a['name'] if a else None
class BlacklistAdmin(admin.ModelAdmin):
list_display = ('ipv4', 'reason', 'created_at', 'modified_at'), HostAdmin), VlanAdmin), RuleAdmin), GroupAdmin), FirewallAdmin), DomainAdmin), RecordAdmin), BlacklistAdmin)
from django.core.exceptions import ValidationError
from django.forms import fields
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.utils.ipv6 import is_valid_ipv6_address
from south.modelsinspector import add_introspection_rules
import re
mac_re = re.compile(r'^([0-9a-fA-F]{2}([:-]?|$)){6}$')
alfanum_re = re.compile(r'^[A-Za-z0-9_-]+$')
domain_re = re.compile(r'^([A-Za-z0-9_-]\.?)+$')
ipv4_re = re.compile('^[0-9]+\.([0-9]+)\.([0-9]+)\.([0-9]+)$')
reverse_domain_re = re.compile(r'^(%\([abcd]\)d|[a-z0-9.-])+$')
class MACAddressFormField(fields.RegexField):
default_error_messages = {
'invalid': _(u'Enter a valid MAC address.'),
def __init__(self, *args, **kwargs):
super(MACAddressFormField, self).__init__(mac_re, *args, **kwargs)
class MACAddressField(models.Field):
empty_strings_allowed = False
def __init__(self, *args, **kwargs):
kwargs['max_length'] = 17
super(MACAddressField, self).__init__(*args, **kwargs)
def get_internal_type(self):
return 'CharField'
def formfield(self, **kwargs):
defaults = {'form_class': MACAddressFormField}
return super(MACAddressField, self).formfield(**defaults)
add_introspection_rules([], ["firewall\.fields\.MACAddressField"])
def val_alfanum(value):
"""Validate whether the parameter is a valid alphanumeric value."""
if not alfanum_re.match(value):
raise ValidationError(_(u'%s - only letters, numbers, underscores '
'and hyphens are allowed!') % value)
def is_valid_domain(value):
"""Check whether the parameter is a valid domain name."""
return domain_re.match(value) is not None
def val_domain(value):
"""Validate whether the parameter is a valid domin name."""
if not is_valid_domain(value):
raise ValidationError(_(u'%s - invalid domain name') % value)
def is_valid_reverse_domain(value):
"""Check whether the parameter is a valid reverse domain name."""
return reverse_domain_re.match(value) is not None
def val_reverse_domain(value):
"""Validate whether the parameter is a valid reverse domain name."""
if not is_valid_reverse_domain(value):
raise ValidationError(u'%s - invalid reverse domain name' % value)
def is_valid_ipv4_address(value):
"""Check whether the parameter is a valid IPv4 address."""
return ipv4_re.match(value) is not None
def val_ipv4(value):
"""Validate whether the parameter is a valid IPv4 address."""
if not is_valid_ipv4_address(value):
raise ValidationError(_(u'%s - not an IPv4 address') % value)
def val_ipv6(value):
"""Validate whether the parameter is a valid IPv6 address."""
if not is_valid_ipv6_address(value):
raise ValidationError(_(u'%s - not an IPv6 address') % value)
def ipv4_2_ipv6(ipv4):
"""Convert IPv4 address string to IPv6 address string."""
m = ipv4_re.match(ipv4)
return ("2001:738:2001:4031:%s:%s:%s:0" %
from django.contrib import auth
from firewall import models
import os
import django.conf
import subprocess
import re
import json
from datetime import datetime, timedelta
from django.db.models import Q
settings = django.conf.settings.FIREWALL_SETTINGS
class Firewall:
RULES = None
vlans = None
dmz = None
pub = None
hosts = None
fw = None
def dportsport(self, rule, repl=True):
retval = ' '
if rule.proto == 'tcp' or rule.proto == 'udp':
retval = '-p %s ' % rule.proto
retval += ' --sport %s ' %
if rule.dport:
retval += ' --dport %s ' % (rule.nat_dport
if (repl and rule.nat and rule.direction == '1')
else rule.dport)
elif rule.proto == 'icmp':
retval = '-p %s ' % rule.proto
return retval
def iptables(self, s):
"""Append rule to filter table."""
def iptablesnat(self, s):
"""Append rule to NAT table."""
def host2vlan(self, host, rule):
if not rule.foreign_network:
if self.IPV6 and host.ipv6:
ipaddr = host.ipv6 + '/112'
ipaddr = host.ipv4
dport_sport = self.dportsport(rule)
for vlan in rule.foreign_network.vlans.all():
if rule.accept:
if rule.direction == '0' and == 'PUB':
if rule.dport == 25:
self.iptables('-A PUB_OUT -s %s %s -p tcp '
'--dport 25 -j LOG_ACC' %
(ipaddr, rule.extra))
action = 'PUB_OUT'
action = 'LOG_ACC'
action = 'LOG_DROP'
if rule.direction == '1': # going TO host
self.iptables('-A %s_%s -d %s %s %s -g %s' % (vlan,
host.vlan, ipaddr, dport_sport, rule.extra, action))
self.iptables('-A %s_%s -s %s %s %s -g %s' % (host.vlan,
vlan, ipaddr, dport_sport, rule.extra, action))
def fw2vlan(self, rule):
if not rule.foreign_network:
dport_sport = self.dportsport(rule)
for vlan in rule.foreign_network.vlans.all():
if rule.direction == '1': # going TO host
self.iptables('-A INPUT -i %s %s %s -g %s' %
(vlan.interface, dport_sport, rule.extra,
'LOG_ACC' if rule.accept else 'LOG_DROP'))
self.iptables('-A OUTPUT -o %s %s %s -g %s' %
(vlan.interface, dport_sport, rule.extra,
'LOG_ACC' if rule.accept else 'LOG_DROP'))
def vlan2vlan(self, l_vlan, rule):
if not rule.foreign_network:
dport_sport = self.dportsport(rule)
for vlan in rule.foreign_network.vlans.all():
if rule.accept:
if rule.direction == '0' and == 'PUB':
action = 'PUB_OUT'
action = 'LOG_ACC'
action = 'LOG_DROP'
if rule.direction == '1': # going TO host
self.iptables('-A %s_%s %s %s -g %s' % (vlan, l_vlan,
dport_sport, rule.extra, action))
self.iptables('-A %s_%s %s %s -g %s' % (l_vlan, vlan,
dport_sport, rule.extra, action))
def prerun(self):
self.iptables(':INPUT DROP [88:6448]')
self.iptables(':FORWARD DROP [0:0]')
self.iptables(':OUTPUT DROP [50:6936]')
# initialize logging
self.iptables('-N LOG_DROP')
# windows port scan are silently dropped
self.iptables('-A LOG_DROP -p tcp --dport 445 -j DROP')
self.iptables('-A LOG_DROP -p udp --dport 137 -j DROP')
self.iptables('-A LOG_DROP -j LOG --log-level 7 '
'--log-prefix "[ipt][drop]"')
self.iptables('-A LOG_DROP -j DROP')
self.iptables('-N LOG_ACC')
self.iptables('-A LOG_ACC -j LOG --log-level 7 '
'--log-prefix "[ipt][isok]"')
self.iptables('-A LOG_ACC -j ACCEPT')
self.iptables('-N PUB_OUT')
self.iptables('-A FORWARD -m set --match-set blacklist src,dst -j DROP')
self.iptables('-A FORWARD -m state --state INVALID -g LOG_DROP')
self.iptables('-A FORWARD -m state --state ESTABLISHED,RELATED '
'-j ACCEPT')
self.iptables('-A FORWARD -p icmp --icmp-type echo-request '
'-g LOG_ACC')
self.iptables('-A INPUT -m set --match-set blacklist src -j DROP')
self.iptables('-A INPUT -m state --state INVALID -g LOG_DROP')
self.iptables('-A INPUT -i lo -j ACCEPT')
self.iptables('-A INPUT -m state --state ESTABLISHED,RELATED '
'-j ACCEPT')
self.iptables('-A OUTPUT -m state --state INVALID -g LOG_DROP')
self.iptables('-A OUTPUT -o lo -j ACCEPT')
self.iptables('-A OUTPUT -m state --state ESTABLISHED,RELATED '
'-j ACCEPT')
def postrun(self):
self.iptables('-A PUB_OUT -s -p tcp --dport 25 '
'-j LOG_ACC')
self.iptables('-A PUB_OUT -s -p tcp --dport 445 '
'-j LOG_ACC')
self.iptables('-A PUB_OUT -p tcp --dport 25 -j LOG_DROP')
self.iptables('-A PUB_OUT -p tcp --dport 445 -j LOG_DROP')
self.iptables('-A PUB_OUT -p udp --dport 445 -j LOG_DROP')
self.iptables('-A PUB_OUT -g LOG_ACC')
self.iptables('-A FORWARD -g LOG_DROP')
self.iptables('-A INPUT -g LOG_DROP')
self.iptables('-A OUTPUT -g LOG_DROP')
def ipt_nat(self):
self.iptablesnat(':PREROUTING ACCEPT [0:0]')
self.iptablesnat(':INPUT ACCEPT [0:0]')
self.iptablesnat(':OUTPUT ACCEPT [1:708]')
self.iptablesnat(':POSTROUTING ACCEPT [1:708]')
# portforward
for host in self.hosts.exclude(pub_ipv4=None):
for rule in host.rules.filter(nat=True, direction='1'):
dport_sport = self.dportsport(rule, False)
if host.vlan.snat_ip:
self.iptablesnat('-A PREROUTING -d %s %s %s -j DNAT '
'--to-destination %s:%s' % (host.pub_ipv4,
dport_sport, rule.extra, host.ipv4,
# rules for machines with dedicated public IP
for host in self.hosts.exclude(shared_ip=True):
if host.pub_ipv4:
self.iptablesnat('-A PREROUTING -d %s -j DNAT '
'--to-destination %s' % (host.pub_ipv4, host.ipv4))
self.iptablesnat('-A POSTROUTING -s %s -j SNAT '
'--to-source %s' % (host.ipv4, host.pub_ipv4))
# default NAT rules for VLANs
for s_vlan in self.vlans:
if s_vlan.snat_ip:
for d_vlan in s_vlan.snat_to.all():
self.iptablesnat('-A POSTROUTING -s %s -o %s -j SNAT '
'--to-source %s' % (s_vlan.net_ipv4(),
d_vlan.interface, s_vlan.snat_ip))
# hard-wired rules
self.iptablesnat('-A POSTROUTING -s -o vlan0003 -j SNAT '
'--to-source') # man elerheto legyen
self.iptablesnat('-A POSTROUTING -s -o vlan0008 -j SNAT '
'--to-source') # wolf network for printing
self.iptablesnat('-A POSTROUTING -s -o vlan0002 -j SNAT '
'--to-source %s' % # kulonben nemmegy a du
def ipt_filter(self):
ipv4_re = re.compile('([0-9]{1,3}\.){3}[0-9]{1,3}')
# pre-run stuff
# firewall's own rules
for f in self.fw:
for rule in f.rules.all():
# zonak kozotti lancokra ugras
for s_vlan in self.vlans:
for d_vlan in self.vlans:
self.iptables('-N %s_%s' % (s_vlan, d_vlan))
self.iptables('-A FORWARD -i %s -o %s -g %s_%s' %
(s_vlan.interface, d_vlan.interface, s_vlan, d_vlan))
# hosts' rules
for i_vlan in self.vlans:
for i_host in i_vlan.host_set.all():
for group in i_host.groups.all():
for rule in group.rules.all():
self.host2vlan(i_host, rule)
for rule in i_host.rules.all():
self.host2vlan(i_host, rule)
# enable communication between VLANs
for s_vlan in self.vlans:
for rule in s_vlan.rules.all():
self.vlan2vlan(s_vlan, rule)
# zonak kozotti lancokat zarja le
for s_vlan in self.vlans:
for d_vlan in self.vlans:
self.iptables('-A %s_%s -g LOG_DROP' % (s_vlan, d_vlan))
# post-run stuff
if self.IPV6:
self.RULES = [x for x in self.RULES if not]
self.RULES = [x.replace('icmp', 'icmpv6') for x in self.RULES]
def __init__(self, IPV6=False):
self.IPV6 = IPV6
self.vlans = models.Vlan.objects.all()
self.hosts = models.Host.objects.all()
self.dmz = models.Vlan.objects.get(name='DMZ') = models.Vlan.objects.get(name='PUB')
self.fw = models.Firewall.objects.all()
if not self.IPV6:
def reload(self):
if self.IPV6:
process = subprocess.Popen(['/usr/bin/ssh', 'fw2',
'/usr/bin/sudo', '/sbin/ip6tables-restore', '-c'],
shell=False, stdin=subprocess.PIPE)
process.communicate('\n'.join(self.RULES) + '\n')
process = subprocess.Popen(['/usr/bin/ssh', 'fw2',
'/usr/bin/sudo', '/sbin/iptables-restore', '-c'],
shell=False, stdin=subprocess.PIPE)
process.communicate('\n'.join(self.RULES) + '\n' +
'\n'.join(self.RULES_NAT) + '\n')
def get(self):
if self.IPV6:
return { 'filter': self.RULES, }
return { 'filter': self.RULES, 'nat': self.RULES_NAT }
def show(self):
if self.IPV6:
return '\n'.join(self.RULES) + '\n'
return ('\n'.join(self.RULES) + '\n' +
'\n'.join(self.RULES_NAT) + '\n')
def ipset():
week =
return models.Blacklist.objects.filter(Q(type='tempban', modified_at__gte=week) | Q(type='permban')).values('ipv4', 'reason')
def ipv6_to_octal(ipv6):
while len(ipv6.split(':')) < 8:
ipv6 = ipv6.replace('::', ':::')
octets = []
for part in ipv6.split(':'):
if not part:
octets.extend([0, 0])
# Pad hex part to 4 digits.
part = '%04x' % int(part, 16)
octets.append(int(part[:2], 16))
octets.append(int(part[2:], 16))
return '\\' + '\\'.join(['%03o' % x for x in octets])
def ipv4_to_arpa(ipv4, cname=False):
m2 ='^([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)$', ipv4)
if cname:
return ('' %
return ('' %
def ipv6_to_arpa(ipv6):
while len(ipv6.split(':')) < 8:
ipv6 = ipv6.replace('::', ':::')
octets = []
for part in ipv6.split(':'):
if not part:
octets.extend([0, 0, 0, 0])
# Pad hex part to 4 digits.
part = '%04x' % int(part, 16)
octets.insert(0, int(part[0], 16))
octets.insert(0, int(part[1], 16))
octets.insert(0, int(part[2], 16))
octets.insert(0, int(part[3], 16))
return '.'.join(['%1x' % x for x in octets]) + ''
# =fqdn:ip:ttl A, PTR
# &fqdn:ip:x:ttl NS
# ZfqdnSOA
# +fqdn:ip:ttl A
# ^ PTR
# : generic
def dns():
vlans = models.Vlan.objects.all()
regex = re.compile(r'^([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)$')
DNS = []
for i_vlan in vlans:
m =
rev = i_vlan.reverse_domain
for i_host in i_vlan.host_set.all():
ipv4 = (i_host.pub_ipv4 if i_host.pub_ipv4 and
not i_host.shared_ip else i_host.ipv4)
i = ipv4.split('.', 4)
reverse = (i_host.reverse if i_host.reverse and
len(i_host.reverse) else i_host.get_fqdn())
# ipv4
if i_host.ipv4:
DNS.append("^%s:%s:%s" % (
(rev % { 'a': int(i[0]), 'b': int(i[1]), 'c': int(i[2]),
'd': int(i[3]) }),
reverse, models.settings['dns_ttl']))
# ipv6
if i_host.ipv6:
DNS.append("^%s:%s:%s" % (ipv6_to_arpa(i_host.ipv6),
reverse, models.settings['dns_ttl']))
for domain in models.Domain.objects.all():
DNS.append("" % (,
settings['dns_hostname'], models.settings['dns_ttl']))
for r in models.Record.objects.all():
d = r.get_data()
if d['type'] == 'A':
DNS.append("+%s:%s:%s" % (d['name'], d['address'], d['ttl']))
elif d['type'] == 'AAAA':
DNS.append(":%s:28:%s:%s" % (d['name'],
ipv6_to_octal(d['address']), d['ttl']))
elif d['type'] == 'NS':
DNS.append("&%s::%s:%s" % (d['name'], d['address'], d['ttl']))
elif d['type'] == 'CNAME':
DNS.append("C%s:%s:%s" % (d['name'], d['address'], d['ttl']))
elif d['type'] == 'MX':
mx = d['address'].split(':', 2)
DNS.append("@%(fqdn)s::%(mx)s:%(dist)s:%(ttl)s" %
{'fqdn': d['name'], 'mx': mx[1], 'dist': mx[0],
'ttl': d['ttl']})
elif d['type'] == 'PTR':
DNS.append("^%s:%s:%s" % (d['name'], d['address'], d['ttl']))
return DNS
process = subprocess.Popen(['/usr/bin/ssh', 'tinydns@%s' %
settings['dns_hostname']], shell=False, stdin=subprocess.PIPE)
# print "\n".join(DNS)+"\n"
def prefix_to_mask(prefix):
t = [0, 0, 0, 0]
for i in range(0, 4):
if prefix > i * 8 + 7:
t[i] = 255
elif i * 8 < prefix and prefix <= (i + 1) * 8:
t[i] = 256 - (2 ** ((i + 1) * 8 - prefix))
return ".".join([str(i) for i in t])
def dhcp():
vlans = models.Vlan.objects.all()
regex = re.compile(r'^([0-9]+)\.([0-9]+)\.[0-9]+\.[0-9]+\s+'
DHCP = []
# /tools/dhcp3/dhcpd.conf.generated
for i_vlan in vlans:
m =
if(m or i_vlan.dhcp_pool == "manual"):
DHCP.append ('''
# %(name)s - %(interface)s
subnet %(net)s netmask %(netmask)s {
option domain-name "%(domain)s";
option routers %(router)s;
option domain-name-servers %(dnsserver)s;
option ntp-servers %(ntp)s;
next-server %(tftp)s;
filename \"pxelinux.0\";
allow bootp; allow booting;
}''' % {
'net': i_vlan.net4,
'netmask': prefix_to_mask(i_vlan.prefix4),
'domain': i_vlan.domain,
'router': i_vlan.ipv4,
'ntp': i_vlan.ipv4,
'dnsserver': settings['rdns_ip'],
'extra': ("range %s" % i_vlan.dhcp_pool
if m else "deny unknown-clients"),
'interface': i_vlan.interface,
'tftp': i_vlan.ipv4
for i_host in i_vlan.host_set.all():
DHCP.append ('''
host %(hostname)s {
hardware ethernet %(mac)s;
fixed-address %(ipv4)s;
}''' % {
'hostname': i_host.hostname,
'mac': i_host.mac,
'ipv4': i_host.ipv4,
return DHCP
process = subprocess.Popen(['/usr/bin/ssh', 'fw2',
'cat > /tools/dhcp3/dhcpd.conf.generated;'
'sudo /etc/init.d/isc-dhcp-server restart'], shell=False,
# print "\n".join(DHCP)+"\n"
