Commit 5a892e40 by Dudás Ádám

firewall: moar readability

parent a47e41cb
......@@ -189,18 +189,17 @@ CELERY_ROUTES = {
}
store_settings = {
"basic_auth": "True",
"verify_ssl": "False",
"ssl_auth": "False",
"store_client_pass": "IQu8Eice",
"store_client_user": "admin",
"store_client_key": "/opt/webadmin/cloud/client.key",
"store_client_cert": "/opt/webadmin/cloud/client.crt",
"store_url": "http://localhost:9000",
"store_public": "store.ik.bme.hu",
"basic_auth": "True",
"verify_ssl": "False",
"ssl_auth": "False",
"store_client_pass": "IQu8Eice",
"store_client_user": "admin",
"store_client_key": "/opt/webadmin/cloud/client.key",
"store_client_cert": "/opt/webadmin/cloud/client.crt",
"store_url": "http://localhost:9000",
"store_public": "store.ik.bme.hu",
}
firewall_settings = {
"default_vlangroup": "publikus",
"reload_sleep": "10",
......
......@@ -13,7 +13,7 @@ class RecordInline(contrib.admin.TabularInline):
class HostAdmin(admin.ModelAdmin):
list_display = ('hostname', 'vlan', 'ipv4', 'ipv6', 'pub_ipv4', 'mac',
'shared_ip', 'owner', 'description', 'reverse', 'groups_l')
'shared_ip', 'owner', 'description', 'reverse', 'list_groups')
ordering = ('hostname', )
list_filter = ('owner', 'vlan', 'groups')
search_fields = ('hostname', 'description', 'ipv4', 'ipv6', 'mac')
......@@ -21,7 +21,7 @@ class HostAdmin(admin.ModelAdmin):
inlines = (RuleInline, RecordInline)
@staticmethod
def groups_l(instance):
def list_groups(instance):
"""Returns instance's groups' names as a comma-separated list."""
names = [group.name for group in instance.groups.all()]
return u', '.join(names)
......@@ -43,36 +43,39 @@ class RuleAdmin(admin.ModelAdmin):
list_filter = ('r_type', 'vlan', 'owner', 'direction', 'accept',
'proto', 'nat')
def color_desc(self, instance):
@staticmethod
def color_desc(instance):
"""Returns a colorful description of the instance."""
para = '</span>'
if instance.dport:
para = 'dport=%s %s' % (instance.dport, para)
if instance.sport:
para = 'sport=%s %s' % (instance.sport, para)
if instance.proto:
para = 'proto=%s %s' % (instance.proto, para)
para = u'<span style="color: #00FF00;">' + para
return (
u'<span style="color: #FF0000;">[%s]</span> ' % instance.r_type +
(u'%s<span style="color: #0000FF;"> ▸ </span>%s' %
((instance.foreign_network.name, instance.r_type)
if instance.direction == '1' else
(instance.r_type, instance.foreign_network.name))) +
' ' + para + ' ' + instance.description)
return (u'<span style="color: #FF0000;">[%(type)s]</span> '
u'%(src)s<span style="color: #0000FF;"> ▸ </span>%(dst)s '
u'%(para)s %(desc)s') % {
'type': instance.r_type,
'src': (instance.foreign_network.name
if instance.direction == '1' else instance.r_type),
'dst': (instance.r_type if instance.direction == '1'
else instance.foreign_network.name),
'para': (u'<span style="color: #00FF00;">' +
(('proto=%s ' % instance.proto)
if instance.proto else '') +
(('sport=%s ' % instance.sport)
if instance.sport else '') +
(('dport=%s ' % instance.dport)
if instance.dport else '') +
'</span>'),
'desc': instance.description}
color_desc.allow_tags = True
def vlan_l(self, instance):
@staticmethod
def vlan_l(instance):
"""Returns instance's VLANs' names as a comma-separated list."""
retval = []
for vlan in instance.foreign_network.vlans.all():
retval.append(vlan.name)
return u', '.join(retval)
names = [vlan.name for vlan in instance.foreign_network.vlans.all()]
return u', '.join(names)
def used_in(self, instance):
@staticmethod
def used_in(instance):
for field in [instance.vlan, instance.vlangroup, instance.host,
instance.hostgroup, instance.firewall]:
if field is not None:
if field:
return unicode(field) + ' ' + field._meta.object_name
......@@ -92,15 +95,15 @@ class DomainAdmin(admin.ModelAdmin):
class RecordAdmin(admin.ModelAdmin):
list_display = ('name_', 'type', 'address_', 'ttl', 'host', 'owner')
def address_(self, instance):
@staticmethod
def address_(instance):
a = instance.get_data()
if a:
return a['address']
return a['address'] if a else None
def name_(self, instance):
@staticmethod
def name_(instance):
a = instance.get_data()
if a:
return a['name']
return a['name'] if a else None
class BlacklistAdmin(admin.ModelAdmin):
list_display = ('ipv4', 'reason', 'created_at', 'modified_at')
......
......@@ -2,6 +2,7 @@ from django.core.exceptions import ValidationError
from django.forms import fields
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.utils.ipv6 import is_valid_ipv6_address
from south.modelsinspector import add_introspection_rules
import re
......@@ -35,26 +36,46 @@ class MACAddressField(models.Field):
add_introspection_rules([], ["firewall\.fields\.MACAddressField"])
def val_alfanum(value):
"""Check whether the parameter is a valid alphanumeric value."""
if alfanum_re.search(value) is None:
raise ValidationError(
_(u'%s - only letters, numbers, underscores and hyphens are '
'allowed!') % value)
"""Validate whether the parameter is a valid alphanumeric value."""
if alfanum_re.match(value) is None:
raise ValidationError(_(u'%s - only letters, numbers, underscores '
'and hyphens are allowed!') % value)
def is_valid_domain(value):
"""Check whether the parameter is a valid domain name."""
return domain_re.match(value) is not None
def val_domain(value):
"""Check wheter the parameter is a valid domin."""
if domain_re.search(value) is None:
raise ValidationError(_(u'%s - invalid domain') % value)
"""Validate whether the parameter is a valid domin name."""
if not is_valid_domain(value):
raise ValidationError(_(u'%s - invalid domain name') % value)
def is_valid_reverse_domain(value):
"""Check whether the parameter is a valid reverse domain name."""
return reverse_domain_re.match(value) is not None
def val_reverse_domain(value):
"""Check whether the parameter is a valid reverse domain."""
if not reverse_domain_re.search(value):
raise ValidationError(u'%s - reverse domain' % value)
"""Validate whether the parameter is a valid reverse domain name."""
if not is_valid_reverse_domain(value):
raise ValidationError(u'%s - invalid reverse domain name' % value)
def is_valid_ipv4_address(value):
"""Check whether the parameter is a valid IPv4 address."""
return ipv4_re.match(value) is not None
def val_ipv4(value):
"""Validate whether the parameter is a valid IPv4 address."""
if not is_valid_ipv4_address(value):
raise ValidationError(_(u'%s - not an IPv4 address') % value)
def val_ipv6(value):
"""Validate whether the parameter is a valid IPv6 address."""
if not is_valid_ipv6_address(value):
raise ValidationError(_(u'%s - not an IPv6 address') % value)
def ipv4_2_ipv6(ipv4):
"""Convert IPv4 address string to IPv6 address string."""
val_ipv4(ipv4)
m = ipv4_re.match(ipv4)
if m is None:
raise ValidationError(_(u'%s - not an IPv4 address') % ipv4)
return ("2001:738:2001:4031:%s:%s:%s:0" %
(m.group(1), m.group(2), m.group(3)))
......@@ -36,10 +36,11 @@ class firewall:
def iptables(self, s):
"""Append rule."""
"""Append rule to filter table."""
self.RULES.append(s)
def iptablesnat(self, s):
"""Append rule to NAT table."""
self.RULES_NAT.append(s)
def host2vlan(self, host, rule):
......
......@@ -8,7 +8,6 @@ from firewall.fields import *
from south.modelsinspector import add_introspection_rules
from django.core.validators import MinValueValidator, MaxValueValidator
from cloud.settings import firewall_settings as settings
from django.utils.ipv6 import is_valid_ipv6_address
from django.db.models.signals import post_save
import re
......@@ -54,27 +53,23 @@ class Rule(models.Model):
return self.desc()
def clean(self):
count = 0
for field in [self.vlan, self.vlangroup, self.host, self.hostgroup,
self.firewall]:
if field is None:
count = count + 1
if count != 4:
raise ValidationError('jaj')
fields = [self.vlan, self.vlangroup, self.host, self.hostgroup,
self.firewall]
selected_fields = [field for field in fields if field]
if len(selected_fields) > 1:
raise ValidationError(_('Only one field can be selected.'))
def desc(self):
para = u""
if(self.dport):
para = "dport=%s %s" % (self.dport, para)
if(self.sport):
para = "sport=%s %s" % (self.sport, para)
if(self.proto):
para = "proto=%s %s" % (self.proto, para)
return (u'[' + self.r_type + u'] ' +
(unicode(self.foreign_network) + u' ▸ ' + self.r_type
if self.direction == '1' else self.r_type + u' ▸ ' +
unicode(self.foreign_network)) + u' ' + para + u' ' +
self.description)
return u'[%(type)s] %(src)s ▸ %(dst)s %(para)s %(desc)s' % {
'type': self.r_type,
'src': (unicode(self.foreign_network) if self.direction == '1'
else self.r_type),
'dst': (self.r_type if self.direction == '1'
else unicode(self.foreign_network)),
'para': ((("proto=%s " % self.proto) if self.proto else '') +
(("sport=%s " % self.sport) if self.sport else '') +
(("dport=%s " % self.dport) if self.dport else '')),
'desc': self.description}
class Vlan(models.Model):
vid = models.IntegerField(unique=True)
......@@ -170,17 +165,15 @@ class Host(models.Model):
self.full_clean()
super(Host, self).save(*args, **kwargs)
if id is None:
Record(domain=self.vlan.domain, host=self, type='A',
t = 'A' if self.ipv6 else 'AAAA'
Record(domain=self.vlan.domain, host=self, type=t,
owner=self.owner).save()
if self.ipv6:
Record(domain=self.vlan.domain, host=self, type='AAAA',
owner=self.owner).save()
def enable_net(self):
self.groups.add(Group.objects.get(name="netezhet"))
def add_port(self, proto, public, private = 0):
proto = "tcp" if (proto == "tcp") else "udp"
proto = "tcp" if proto == "tcp" else "udp"
if self.shared_ip:
if public < 1024:
raise ValidationError(_("Only ports above 1024 can be used."))
......@@ -197,8 +190,9 @@ class Host(models.Model):
raise ValidationError(_("Port %s %s is already in use.") %
(proto, public))
rule = Rule(direction='1', owner=self.owner, dport=public,
proto=proto, nat=False, accept=True, r_type="host", host=self,
foreign_network=VlanGroup.objects.get(name=settings["default_vlangroup"]))
proto=proto, nat=False, accept=True, r_type="host",
host=self, foreign_network=VlanGroup.objects
.get(name=settings["default_vlangroup"]))
rule.full_clean()
rule.save()
......@@ -208,11 +202,10 @@ class Host(models.Model):
dport=public).delete()
def list_ports(self):
retval = []
for rule in self.rules.filter(owner=self.owner):
retval.append({'proto': rule.proto, 'public': rule.dport,
'private': rule.nat_dport})
return retval
return [{'proto': rule.proto,
'public': rule.dport,
'private': rule.nat_dport} for rule in
self.rules.filter(owner=self.owner)]
def get_fqdn(self):
return self.hostname + u'.' + unicode(self.vlan.domain)
......@@ -255,75 +248,90 @@ class Record(models.Model):
def desc(self):
a = self.get_data()
if a:
return a['name'] + u' ' + a['type'] + u' ' + a['address']
return '(empty)'
return (u' '.join([a['name'], a['type'], a['address']])
if a else _('(empty)'))
def save(self, *args, **kwargs):
self.full_clean()
super(Record, self).save(*args, **kwargs)
def clean(self):
if self.name and self.name.endswith(u'.'):
raise ValidationError(_("Domain can't be terminated with a dot."))
if self.host and self.type in ['CNAME', 'A', 'AAAA']:
if self.type == 'CNAME':
if not self.name or self.address:
raise ValidationError(_("Only the 'name' field should "
"be filled with a CNAME record if a host is "
"set."))
elif self.name or self.address:
raise ValidationError(_("'name' and 'address' can't be "
"specified with an A or AAAA record if a host is "
"set."))
else:
if not self.address:
raise ValidationError(_("'address' field must be filled."))
if self.name:
self.name = self.name.rstrip(".") # remove trailing dots
if self.host:
if self.type in ['A', 'AAAA']:
if self.address:
raise ValidationError(_("Can't specify address for "
"A or AAAA records if host is set!"))
if self.name:
raise ValidationError(_("Can't specify name for "
"A or AAAA records if host is set!"))
elif self.type == 'CNAME':
if self.name is None:
raise ValidationError(_("Name must be specified for "
"CNAME records if host is set!"))
if self.address:
raise ValidationError(_("Can't specify address for "
"CNAME records if host is set!"))
else: # if self.host is None
if self.address is None:
raise ValidationError(_("Address must be specified!"))
if self.type == 'A':
if not ipv4_re.match(self.address):
raise ValidationError(_("Not a valid IPv4 address."))
elif self.type in ['CNAME', 'NS', 'PTR', 'TXT']:
if not domain_re.match(self.address):
raise ValidationError(_("Not a valid domain."))
val_ipv4(self.address)
elif self.type == 'AAAA':
if not is_valid_ipv6_address(self.address):
raise ValidationError(_("Not a valid IPv6 address."))
val_ipv6(self.address)
elif self.type in ['CNAME', 'NS', 'PTR', 'TXT']:
val_domain(self.address)
elif self.type == 'MX':
mx = self.address.split(':', 1)
if not (len(mx) == 2 and mx[0].isdigit() and
domain_re.match(mx[1])):
raise ValidationError(_("Invalid address. "
"Valid format: <priority>:<hostname>"))
raise ValidationError(_("Bad address format. "
"Should be: <priority>:<hostname>"))
else:
raise ValidationError(_("Unknown record."))
raise ValidationError(_("Unknown record type."))
def get_data(self):
retval = { 'name': self.name, 'type': self.type, 'ttl': self.ttl,
'address': self.address }
if self.host and self.type in ['CNAME', 'A', 'AAAA']:
def __get_name(self):
if self.host:
if self.type in ['A', 'AAAA']:
return self.host.get_fqdn()
elif self.type == 'CNAME':
return self.name + '.' + unicode(self.domain)
else:
return self.name
else: # if self.host is None
if self.name is None:
return unicode(self.domain)
else:
return self.name + '.' + unicode(self.domain)
def __get_address(self):
if self.host:
if self.type == 'A':
retval['address'] = (self.host.pub_ipv4
return (self.host.pub_ipv4
if self.host.pub_ipv4 and not self.host.shared_ip
else self.host.ipv4)
retval['name'] = self.host.get_fqdn()
elif self.type == 'AAAA':
if not self.host.ipv6:
return None
retval['address'] = self.host.ipv6
retval['name'] = self.host.get_fqdn()
return self.host.ipv6
elif self.type == 'CNAME':
retval['address'] = self.host.get_fqdn()
retval['name'] = self.name + u'.' + unicode(self.domain)
else:
if not self.name:
retval['name'] = unicode(self.domain)
else:
retval['name'] = self.name + u'.' + unicode(self.domain)
if not (retval['address'] and retval['name']):
return self.host.get_fqdn()
# otherwise:
return self.address
def get_data(self):
name = __get_name()
address = __get_address()
if self.host and self.type == 'AAAA' and not self.host.ipv6:
return None
elif address is None or name is None:
return None
return retval
else:
return {'name': name,
'type': self.type,
'ttl': self.ttl,
'address': address}
class Blacklist(models.Model):
CHOICES_type = (('permban', 'permanent ban'), ('tempban', 'temporary ban'), ('whitelist', 'whitelist'))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment