Commit 9fa75044 by Őry Máté

firewall: cleanup models

parent de7816ad
......@@ -2,86 +2,112 @@
from django.contrib.auth.models import User
from django.db import models
from django.forms import fields, ValidationError
from django.forms import ValidationError
from django.utils.translation import ugettext_lazy as _
from firewall.fields import *
from south.modelsinspector import add_introspection_rules
from django.core.validators import MinValueValidator, MaxValueValidator
import django.conf
from django.db.models.signals import post_save
import re
import random
settings = django.conf.settings.FIREWALL_SETTINGS
class Rule(models.Model):
"""
A rule of a packet filter, changing the behavior of a host, vlan or firewall.
A rule of a packet filter, changing the behavior of a host, vlan or
firewall.
Some rules accept or deny packets matching some criteria.
Others set address translation or other free-form iptables parameters.
"""
CHOICES_type = (('host', 'host'), ('firewall', 'firewall'),
('vlan', 'vlan'))
('vlan', 'vlan'))
CHOICES_proto = (('tcp', 'tcp'), ('udp', 'udp'), ('icmp', 'icmp'))
CHOICES_dir = (('0', 'out'), ('1', 'in'))
direction = models.CharField(max_length=1, choices=CHOICES_dir,
blank=False, verbose_name=_("direction"),
help_text=_("If the rule matches egress or ingress packets."))
blank=False, verbose_name=_("direction"),
help_text=_("If the rule matches egress "
"or ingress packets."))
description = models.TextField(blank=True,
help_text=_("Why is the rule needed, or how does it work."))
foreign_network = models.ForeignKey('VlanGroup', verbose_name=_("foreign network"),
help_text=_("The group of vlans the matching packet goes to (direction out) or from (in)."),
related_name="ForeignRules")
dport = models.IntegerField(blank=True, null=True, verbose_name=_("dest. port"),
validators=[MinValueValidator(1), MaxValueValidator(65535)],
help_text=_("Destination port number of packets that match."))
sport = models.IntegerField(blank=True, null=True, verbose_name=_("source port"),
validators=[MinValueValidator(1), MaxValueValidator(65535)],
help_text=_("Source port number of packets that match."))
help_text=_("Why is the rule needed, "
"or how does it work."))
foreign_network = models.ForeignKey(
'VlanGroup', verbose_name=_("foreign network"),
help_text=_("The group of vlans the matching packet goes to "
"(direction out) or from (in)."),
related_name="ForeignRules")
dport = models.IntegerField(
blank=True, null=True, verbose_name=_("dest. port"),
validators=[MinValueValidator(1), MaxValueValidator(65535)],
help_text=_("Destination port number of packets that match."))
sport = models.IntegerField(
blank=True, null=True, verbose_name=_("source port"),
validators=[MinValueValidator(1), MaxValueValidator(65535)],
help_text=_("Source port number of packets that match."))
proto = models.CharField(max_length=10, choices=CHOICES_proto,
blank=True, null=True, verbose_name=_("protocol"),
help_text=_("Protocol of packets that match."))
blank=True, null=True, verbose_name=_("protocol"),
help_text=_("Protocol of packets that match."))
extra = models.TextField(blank=True, verbose_name=_("extra arguments"),
help_text=_("Additional arguments passed literally to the iptables-rule."))
help_text=_("Additional arguments passed "
"literally to the iptables-rule."))
accept = models.BooleanField(default=False, verbose_name=_("accept"),
help_text=_("Accept the matching packets (or deny if not checked)."))
help_text=_("Accept the matching packets "
"(or deny if not checked)."))
owner = models.ForeignKey(User, blank=True, null=True,
verbose_name=_("owner"),
help_text=_("The user responsible for this rule."))
help_text=_("The user responsible for "
"this rule."))
r_type = models.CharField(max_length=10, verbose_name=_("Rule type"),
choices=CHOICES_type,
help_text=_("The type of entity the rule belongs to."))
help_text=_("The type of entity the rule "
"belongs to."))
nat = models.BooleanField(default=False, verbose_name=_("NAT"),
help_text=_("If network address translation shoud be done."))
help_text=_("If network address translation "
"shoud be done."))
nat_dport = models.IntegerField(blank=True, null=True,
help_text=_("Rewrite destination port number to."),
help_text=_(
"Rewrite destination port number to."),
validators=[MinValueValidator(1),
MaxValueValidator(65535)])
created_at = models.DateTimeField(auto_now_add=True, verbose_name=_("created at"))
modified_at = models.DateTimeField(auto_now=True, verbose_name=_("modified at"))
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("created at"))
modified_at = models.DateTimeField(
auto_now=True,
verbose_name=_("modified at"))
vlan = models.ForeignKey('Vlan', related_name="rules", blank=True,
null=True, verbose_name=_("vlan"),
help_text=_("Vlan the rule applies to (if type is vlan)."))
null=True, verbose_name=_("vlan"),
help_text=_("Vlan the rule applies to "
"(if type is vlan)."))
vlangroup = models.ForeignKey('VlanGroup', related_name="rules",
blank=True, null=True, verbose_name=_("vlan group"),
help_text=_("Group of vlans the rule applies to (if type is vlan)."))
blank=True, null=True, verbose_name=_(
"vlan group"),
help_text=_("Group of vlans the rule "
"applies to (if type is vlan)."))
host = models.ForeignKey('Host', related_name="rules", blank=True,
verbose_name=_('host'), null=True, help_text=_("Host the rule applies to (if type is host)."))
hostgroup = models.ForeignKey('Group', related_name="rules", verbose_name=_("host group"),
blank=True, null=True, help_text=_("Group of hosts the rule applies to (if type is host)."))
firewall = models.ForeignKey('Firewall', related_name="rules", verbose_name=_("firewall"),
help_text=_("Firewall the rule applies to (if type is firewall)."),
blank=True, null=True)
verbose_name=_('host'), null=True,
help_text=_("Host the rule applies to "
"(if type is host)."))
hostgroup = models.ForeignKey(
'Group', related_name="rules", verbose_name=_("host group"),
blank=True, null=True, help_text=_("Group of hosts the rule applies "
"to (if type is host)."))
firewall = models.ForeignKey(
'Firewall', related_name="rules", verbose_name=_("firewall"),
help_text=_("Firewall the rule applies to "
"(if type is firewall)."),
blank=True, null=True)
def __unicode__(self):
return self.desc()
def clean(self):
fields = [self.vlan, self.vlangroup, self.host, self.hostgroup,
self.firewall]
self.firewall]
selected_fields = [field for field in fields if field]
if len(selected_fields) > 1:
raise ValidationError(_('Only one field can be selected.'))
......@@ -90,9 +116,9 @@ class Rule(models.Model):
return u'[%(type)s] %(src)s ▸ %(dst)s %(para)s %(desc)s' % {
'type': self.r_type,
'src': (unicode(self.foreign_network) if self.direction == '1'
else self.r_type),
else self.r_type),
'dst': (self.r_type if self.direction == '1'
else unicode(self.foreign_network)),
else unicode(self.foreign_network)),
'para': ((("proto=%s " % self.proto) if self.proto else '') +
(("sport=%s " % self.sport) if self.sport else '') +
(("dport=%s " % self.dport) if self.dport else '')),
......@@ -101,12 +127,21 @@ class Rule(models.Model):
class Meta:
verbose_name = _("rule")
verbose_name_plural = _("rules")
ordering = ('r_type', 'direction', 'proto', 'sport', 'dport', 'nat_dport', 'host', )
ordering = (
'r_type',
'direction',
'proto',
'sport',
'dport',
'nat_dport',
'host',
)
class Vlan(models.Model):
vid = models.IntegerField(unique=True)
name = models.CharField(max_length=20, unique=True,
validators=[val_alfanum])
validators=[val_alfanum])
prefix4 = models.IntegerField(default=16)
prefix6 = models.IntegerField(default=80)
interface = models.CharField(max_length=20, unique=True)
......@@ -115,9 +150,9 @@ class Vlan(models.Model):
ipv4 = models.GenericIPAddressField(protocol='ipv4', unique=True)
ipv6 = models.GenericIPAddressField(protocol='ipv6', unique=True)
snat_ip = models.GenericIPAddressField(protocol='ipv4', blank=True,
null=True)
null=True)
snat_to = models.ManyToManyField('self', symmetrical=False, blank=True,
null=True)
null=True)
description = models.TextField(blank=True)
comment = models.TextField(blank=True)
domain = models.ForeignKey('Domain')
......@@ -136,10 +171,11 @@ class Vlan(models.Model):
def net_ipv4(self):
return self.net4 + "/" + unicode(self.prefix4)
class VlanGroup(models.Model):
name = models.CharField(max_length=20, unique=True)
vlans = models.ManyToManyField('Vlan', symmetrical=False, blank=True,
null=True)
null=True)
description = models.TextField(blank=True)
owner = models.ForeignKey(User, blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
......@@ -148,6 +184,7 @@ class VlanGroup(models.Model):
def __unicode__(self):
return self.name
class Group(models.Model):
name = models.CharField(max_length=20, unique=True)
description = models.TextField(blank=True)
......@@ -158,17 +195,18 @@ class Group(models.Model):
def __unicode__(self):
return self.name
class Host(models.Model):
hostname = models.CharField(max_length=40, unique=True,
validators=[val_alfanum])
validators=[val_alfanum])
reverse = models.CharField(max_length=40, validators=[val_domain],
blank=True, null=True)
blank=True, null=True)
mac = MACAddressField(unique=True)
ipv4 = models.GenericIPAddressField(protocol='ipv4', unique=True)
pub_ipv4 = models.GenericIPAddressField(protocol='ipv4', blank=True,
null=True)
null=True)
ipv6 = models.GenericIPAddressField(protocol='ipv6', unique=True,
blank=True, null=True)
blank=True, null=True)
shared_ip = models.BooleanField(default=False)
description = models.TextField(blank=True)
comment = models.TextField(blank=True)
......@@ -176,7 +214,7 @@ class Host(models.Model):
vlan = models.ForeignKey('Vlan')
owner = models.ForeignKey(User)
groups = models.ManyToManyField('Group', symmetrical=False, blank=True,
null=True)
null=True)
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
......@@ -190,18 +228,18 @@ class Host(models.Model):
if (not self.shared_ip and self.pub_ipv4 and Host.objects.
exclude(id=self.id).filter(pub_ipv4=self.pub_ipv4)):
raise ValidationError(_("If shared_ip has been checked, "
"pub_ipv4 has to be unique."))
"pub_ipv4 has to be unique."))
if Host.objects.exclude(id=self.id).filter(pub_ipv4=self.ipv4):
raise ValidationError(_("You can't use another host's NAT'd "
"address as your own IPv4."))
"address as your own IPv4."))
self.full_clean()
super(Host, self).save(*args, **kwargs)
if not id:
Record(domain=self.vlan.domain, host=self, type='A',
owner=self.owner).save()
owner=self.owner).save()
if self.ipv6:
Record(domain=self.vlan.domain, host=self, type='AAAA',
owner=self.owner).save()
owner=self.owner).save()
def enable_net(self):
self.groups.add(Group.objects.get(name="netezhet"))
......@@ -210,7 +248,8 @@ class Host(models.Model):
proto = "tcp" if proto == "tcp" else "udp"
if self.shared_ip:
used_ports = Rule.objects.filter(host__pub_ipv4=self.pub_ipv4,
nat=True, proto=proto).values_list('dport', flat=True)
nat=True, proto=proto
).values_list('dport', flat=True)
if public is None:
public = random.randint(1024, 21000)
......@@ -220,27 +259,28 @@ class Host(models.Model):
public = i
break
else:
raise ValidationError(_("Port %s %s is already in use.") %
(proto, public))
raise ValidationError(
_("Port %s %s is already in use.") %
(proto, public))
else:
if public < 1024:
raise ValidationError(_("Only ports above 1024 can be used."))
raise ValidationError(
_("Only ports above 1024 can be used."))
if public in used_ports:
raise ValidationError(_("Port %s %s is already in use.") %
(proto, public))
(proto, public))
vg = VlanGroup.objects.get(name=settings["default_vlangroup"])
rule = Rule(direction='1', owner=self.owner, dport=public,
proto=proto, nat=True, accept=True, r_type="host",
nat_dport=private, host=self, foreign_network=VlanGroup.
objects.get(name=settings["default_vlangroup"]))
proto=proto, nat=True, accept=True, r_type="host",
nat_dport=private, host=self, foreign_network=vg)
else:
if self.rules.filter(proto=proto, dport=public):
raise ValidationError(_("Port %s %s is already in use.") %
(proto, public))
(proto, public))
rule = Rule(direction='1', owner=self.owner, dport=public,
proto=proto, nat=False, accept=True, r_type="host",
host=self, foreign_network=VlanGroup.objects
proto=proto, nat=False, accept=True, r_type="host",
host=self, foreign_network=VlanGroup.objects
.get(name=settings["default_vlangroup"]))
rule.full_clean()
......@@ -249,10 +289,10 @@ class Host(models.Model):
def del_port(self, proto, private):
if self.shared_ip:
self.rules.filter(owner=self.owner, proto=proto, host=self,
nat_dport=private).delete()
nat_dport=private).delete()
else:
self.rules.filter(owner=self.owner, proto=proto, host=self,
dport=private).delete()
dport=private).delete()
def get_hostname(self, proto):
try:
......@@ -261,7 +301,7 @@ class Host(models.Model):
elif proto == 'ipv4':
if self.shared_ip:
res = Record.objects.filter(type='A',
address=self.pub_ipv4)
address=self.pub_ipv4)
else:
res = self.record_set.filter(type='A')
return unicode(res[0].get_data()['name'])
......@@ -291,7 +331,7 @@ class Host(models.Model):
'host': self.get_hostname(proto='ipv4'),
'port': public4,
}
if self.ipv6: # ipv6
if self.ipv6: # ipv6
forward['ipv6'] = {
'host': self.get_hostname(proto='ipv6'),
'port': public6,
......@@ -309,6 +349,7 @@ class Firewall(models.Model):
def __unicode__(self):
return self.name
class Domain(models.Model):
name = models.CharField(max_length=40, validators=[val_domain])
owner = models.ForeignKey(User)
......@@ -320,11 +361,12 @@ class Domain(models.Model):
def __unicode__(self):
return self.name
class Record(models.Model):
CHOICES_type = (('A', 'A'), ('CNAME', 'CNAME'), ('AAAA', 'AAAA'),
('MX', 'MX'), ('NS', 'NS'), ('PTR', 'PTR'), ('TXT', 'TXT'))
('MX', 'MX'), ('NS', 'NS'), ('PTR', 'PTR'), ('TXT', 'TXT'))
name = models.CharField(max_length=40, validators=[val_domain],
blank=True, null=True)
blank=True, null=True)
domain = models.ForeignKey('Domain')
host = models.ForeignKey('Host', blank=True, null=True)
type = models.CharField(max_length=6, choices=CHOICES_type)
......@@ -354,18 +396,18 @@ class Record(models.Model):
if self.host:
if self.type in ['A', 'AAAA']:
if self.address:
raise ValidationError(_("Can't specify address for "
"A or AAAA records if host is set!"))
raise ValidationError(_("Can't specify address for A "
"or AAAA records if host is set!"))
if self.name:
raise ValidationError(_("Can't specify name for "
"A or AAAA records if host is set!"))
raise ValidationError(_("Can't specify name for A "
"or AAAA records if host is set!"))
elif self.type == 'CNAME':
if not self.name:
raise ValidationError(_("Name must be specified for "
"CNAME records if host is set!"))
"CNAME records if host is set!"))
if self.address:
raise ValidationError(_("Can't specify address for "
"CNAME records if host is set!"))
"CNAME records if host is set!"))
else: # if self.host is None
if not self.address:
raise ValidationError(_("Address must be specified!"))
......@@ -380,8 +422,8 @@ class Record(models.Model):
mx = self.address.split(':', 1)
if not (len(mx) == 2 and mx[0].isdigit() and
domain_re.match(mx[1])):
raise ValidationError(_("Bad address format. "
"Should be: <priority>:<hostname>"))
raise ValidationError(_("Bad MX address format. "
"Should be: <priority>:<name>"))
else:
raise ValidationError(_("Unknown record type."))
......@@ -425,22 +467,29 @@ class Record(models.Model):
'ttl': self.ttl,
'address': address}
class Blacklist(models.Model):
CHOICES_type = (('permban', 'permanent ban'), ('tempban', 'temporary ban'), ('whitelist', 'whitelist'), ('tempwhite', 'tempwhite'))
CHOICES_type = (('permban', 'permanent ban'), ('tempban', 'temporary ban'),
('whitelist', 'whitelist'), ('tempwhite', 'tempwhite'))
ipv4 = models.GenericIPAddressField(protocol='ipv4', unique=True)
host = models.ForeignKey('Host', blank=True, null=True)
reason = models.TextField(blank=True)
snort_message = models.TextField(blank=True)
type = models.CharField(max_length=10, choices=CHOICES_type, default='tempban')
type = models.CharField(
max_length=10,
choices=CHOICES_type,
default='tempban')
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
def save(self, *args, **kwargs):
self.full_clean()
super(Blacklist, self).save(*args, **kwargs)
def __unicode__(self):
return self.ipv4
def send_task(sender, instance, created, **kwargs):
from firewall.tasks import ReloadTask
ReloadTask.apply_async(args=[sender.__name__])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment