Commit 43391a3d by Őry Máté

firewall: merge from network-gui

parent a8c081dc
# -*- coding: utf8 -*-
# -*- coding: utf-8 -*-
from django.contrib import admin
from firewall.models import *
from firewall.models import (Rule, Host, Vlan, Group, VlanGroup, Firewall,
Domain, Record, Blacklist)
from django import contrib
class RuleInline(contrib.admin.TabularInline):
model = Rule
class RecordInline(contrib.admin.TabularInline):
model = Record
class HostAdmin(admin.ModelAdmin):
list_display = ('hostname', 'vlan', 'ipv4', 'ipv6', 'pub_ipv4', 'mac',
'shared_ip', 'owner', 'description', 'reverse', 'list_groups')
'shared_ip', 'owner', 'description', 'reverse',
ordering = ('hostname', )
list_filter = ('owner', 'vlan', 'groups')
search_fields = ('hostname', 'description', 'ipv4', 'ipv6', 'mac')
......@@ -26,42 +30,46 @@ class HostAdmin(admin.ModelAdmin):
names = [ for group in instance.groups.all()]
return u', '.join(names)
class HostInline(contrib.admin.TabularInline):
model = Host
fields = ('hostname', 'ipv4', 'ipv6', 'pub_ipv4', 'mac', 'shared_ip',
'owner', 'reverse')
'owner', 'reverse')
class VlanAdmin(admin.ModelAdmin):
list_display = ('vid', 'name', 'ipv4', 'net_ipv4', 'ipv6', 'net_ipv6',
'description', 'domain', 'snat_ip', )
'description', 'domain', 'snat_ip', )
ordering = ('vid', )
inlines = (RuleInline, )
class RuleAdmin(admin.ModelAdmin):
list_display = ('r_type', 'color_desc', 'owner', 'extra', 'direction',
'accept', 'proto', 'sport', 'dport', 'nat', 'nat_dport', 'used_in')
'accept', 'proto', 'sport', 'dport', 'nat',
'nat_dport', 'used_in')
list_filter = ('r_type', 'vlan', 'owner', 'direction', 'accept',
'proto', 'nat')
'proto', 'nat')
def color_desc(self, instance):
"""Returns a colorful description of the instance."""
return (u'<span style="color: #FF0000;">[%(type)s]</span> '
u'%(src)s<span style="color: #0000FF;"> ▸ </span>%(dst)s '
u'%(para)s %(desc)s') % {
'type': instance.r_type,
'src': (
if instance.direction == '1' else instance.r_type),
'dst': (instance.r_type if instance.direction == '1'
'para': (u'<span style="color: #00FF00;">' +
'type': instance.r_type,
'src': (
if instance.direction == '1' else instance.r_type),
'dst': (instance.r_type if instance.direction == '1'
'para': (u'<span style="color: #00FF00;">' +
(('proto=%s ' % instance.proto)
if instance.proto else '') +
(('sport=%s ' %
if else '') +
(('dport=%s ' % instance.dport)
if instance.dport else '') +
'desc': instance.description}
'desc': instance.description}
color_desc.allow_tags = True
......@@ -73,7 +81,7 @@ class RuleAdmin(admin.ModelAdmin):
def used_in(instance):
for field in [instance.vlan, instance.vlangroup,,
instance.hostgroup, instance.firewall]:
instance.hostgroup, instance.firewall]:
if field:
return unicode(field) + ' ' + field._meta.object_name
......@@ -81,16 +89,20 @@ class RuleAdmin(admin.ModelAdmin):
class AliasAdmin(admin.ModelAdmin):
list_display = ('alias', 'host')
class GroupAdmin(admin.ModelAdmin):
list_display = ('name', 'owner', 'description')
inlines = (RuleInline, )
class FirewallAdmin(admin.ModelAdmin):
inlines = (RuleInline, )
class DomainAdmin(admin.ModelAdmin):
list_display = ('name', 'owner')
class RecordAdmin(admin.ModelAdmin):
list_display = ('name_', 'type', 'address_', 'ttl', 'host', 'owner')
......@@ -104,6 +116,7 @@ class RecordAdmin(admin.ModelAdmin):
a = instance.get_data()
return a['name'] if a else None
class BlacklistAdmin(admin.ModelAdmin):
list_display = ('ipv4', 'reason', 'created_at', 'modified_at')
......@@ -116,4 +129,3 @@, FirewallAdmin), DomainAdmin), RecordAdmin), BlacklistAdmin)
......@@ -6,12 +6,14 @@ from django.utils.ipv6 import is_valid_ipv6_address
from south.modelsinspector import add_introspection_rules
import re
mac_re = re.compile(r'^([0-9a-fA-F]{2}([:-]?|$)){6}$')
mac_re = re.compile(r'^([0-9a-fA-F]{2}(:|$)){6}$')
alfanum_re = re.compile(r'^[A-Za-z0-9_-]+$')
domain_re = re.compile(r'^([A-Za-z0-9_-]\.?)+$')
ipv4_re = re.compile('^[0-9]+\.([0-9]+)\.([0-9]+)\.([0-9]+)$')
reverse_domain_re = re.compile(r'^(%\([abcd]\)d|[a-z0-9.-])+$')
class MACAddressFormField(fields.RegexField):
default_error_messages = {
'invalid': _(u'Enter a valid MAC address.'),
......@@ -20,8 +22,10 @@ class MACAddressFormField(fields.RegexField):
def __init__(self, *args, **kwargs):
super(MACAddressFormField, self).__init__(mac_re, *args, **kwargs)
class MACAddressField(models.Field):
empty_strings_allowed = False
def __init__(self, *args, **kwargs):
kwargs['max_length'] = 17
super(MACAddressField, self).__init__(*args, **kwargs)
......@@ -35,47 +39,68 @@ class MACAddressField(models.Field):
return super(MACAddressField, self).formfield(**defaults)
add_introspection_rules([], ["firewall\.fields\.MACAddressField"])
def val_alfanum(value):
"""Validate whether the parameter is a valid alphanumeric value."""
if not alfanum_re.match(value):
raise ValidationError(_(u'%s - only letters, numbers, underscores '
'and hyphens are allowed!') % value)
'and hyphens are allowed!') % value)
def is_valid_domain(value):
"""Check whether the parameter is a valid domain name."""
return domain_re.match(value) is not None
def val_domain(value):
"""Validate whether the parameter is a valid domin name."""
if not is_valid_domain(value):
raise ValidationError(_(u'%s - invalid domain name') % value)
def is_valid_reverse_domain(value):
"""Check whether the parameter is a valid reverse domain name."""
return reverse_domain_re.match(value) is not None
def val_reverse_domain(value):
"""Validate whether the parameter is a valid reverse domain name."""
if not is_valid_reverse_domain(value):
raise ValidationError(u'%s - invalid reverse domain name' % value)
def is_valid_ipv4_address(value):
"""Check whether the parameter is a valid IPv4 address."""
return ipv4_re.match(value) is not None
def val_ipv4(value):
"""Validate whether the parameter is a valid IPv4 address."""
if not is_valid_ipv4_address(value):
raise ValidationError(_(u'%s - not an IPv4 address') % value)
def val_ipv6(value):
"""Validate whether the parameter is a valid IPv6 address."""
if not is_valid_ipv6_address(value):
raise ValidationError(_(u'%s - not an IPv6 address') % value)
def val_mx(value):
"""Validate whether the parameter is a valid MX address definition.
Expected form is <priority>:<hostname>.
mx = value.split(':', 1)
if not (len(mx) == 2 and mx[0].isdigit() and
raise ValidationError(_("Bad MX address format. "
"Should be: <priority>:<hostname>"))
def ipv4_2_ipv6(ipv4):
"""Convert IPv4 address string to IPv6 address string."""
m = ipv4_re.match(ipv4)
return ("2001:738:2001:4031:%s:%s:%s:0" %
from celery.task import Task, PeriodicTask
import celery
from django.core.cache import cache
import os
import time
from firewall.fw import *
import django.conf
settings = django.conf.settings.FIREWALL_SETTINGS
def reload_dns_task(data):
def reload_firewall_task(data4, data6):
def reload_dhcp_task(data):
def reload_blacklist_task(data):
class Periodic(PeriodicTask):
run_every = timedelta(seconds=10)
......@@ -48,10 +54,11 @@ class Periodic(PeriodicTask):
print "blacklist ujratoltese kesz"
class ReloadTask(Task):
def run(self, type='Host'):
if type in ["Host", "Records", "Domain", "Vlan"]:
if type in ["Host", "Record", "Domain", "Vlan"]:
cache.add("dns_lock", "true", 30)
if type == "Host":
......@@ -64,4 +71,3 @@ class ReloadTask(Task):
cache.add("blacklist_lock", "true", 30)
print type
from django.test import TestCase
from admin import HostAdmin
class MockInstance:
def __init__(self, groups):
self.groups = MockGroups(groups)
class MockGroup:
def __init__(self, name): = name
class MockGroups:
def __init__(self, groups):
self.groups = groups
......@@ -16,6 +19,7 @@ class MockGroups:
def all(self):
return self.groups
class HostAdminTestCase(TestCase):
def test_no_groups(self):
instance = MockInstance([])
......@@ -29,6 +33,6 @@ class HostAdminTestCase(TestCase):
def test_multiple_groups(self):
instance = MockInstance([MockGroup("alma"),
MockGroup("korte"), MockGroup("szilva")])
MockGroup("korte"), MockGroup("szilva")])
l = HostAdmin.list_groups(instance)
self.assertEqual(l, "alma, korte, szilva")
......@@ -2,12 +2,10 @@ import base64
import datetime
import json
import re
import sys
from django.conf import settings
from django.db import IntegrityError
from django.http import HttpResponse
from django.shortcuts import render_to_response
from django.template.loader import render_to_string
from django.utils import translation
from django.utils.timezone import utc
......@@ -15,13 +13,13 @@ from django.utils.translation import ugettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from celery.task.control import inspect
from tasks import *
from firewall.fw import *
from firewall.models import *
from one.tasks import SendMailTask
def reload_firewall(request):
if request.user.is_authenticated():
if request.user.is_superuser:
......@@ -34,36 +32,46 @@ def reload_firewall(request):
html = _("Dear anonymous, you've not signed in yet!")
return HttpResponse(html)
def firewall_api(request):
data = json.loads(base64.b64decode(request.POST["data"]))
command = request.POST["command"]
if data["password"] != "bdmegintelrontottaanetet":
raise Exception(_("Wrong password."))
if command == "blacklist":
obj, created = Blacklist.objects.get_or_create(ipv4=data["ip"])
obj.reason = data["reason"]
obj.snort_message = data["snort_message"]
if created:
try: = models.Host.objects.get(ipv4=data["ip"]) = Host.objects.get(ipv4=data["ip"])
user =
lang = user.person_set.all()[0].language
msg = render_to_string('mails/notification-ban-now.txt',
{ 'user': user,
'bl': obj,
'url': settings.CLOUD_URL} )
SendMailTask.delay(, subject='[IK Cloud] %s' %, msg=msg, sender=u'')
except (Host.DoesNotExist, ValidationError, IntegrityError, AttributeError):
msg = render_to_string(
'user': user,
'bl': obj,
'url': settings.CLOUD_URL
subject='[IK Cloud] %s' %,
msg=msg, sender=u'')
except (Host.DoesNotExist, ValidationError,
IntegrityError, AttributeError):
print obj.modified_at + datetime.timedelta(minutes=5)
print datetime.datetime.utcnow().replace(tzinfo=utc)
if obj.type == 'tempwhite' and obj.modified_at + datetime.timedelta(minutes=1) < datetime.datetime.utcnow().replace(tzinfo=utc):
modified = obj.modified_at + datetime.timedelta(minutes=1)
now = datetime.dateime.utcnow().replace(tzinfo=utc)
if obj.type == 'tempwhite' and modified < now:
obj.type = 'tempban'
return HttpResponse(unicode(_("OK")))
......@@ -76,28 +84,27 @@ def firewall_api(request):
if command == "create":
data["owner"] = "opennebula"
owner = auth.models.User.objects.get(username=data["owner"])
host = models.Host(hostname=data["hostname"],
mac=data["mac"], ipv4=data["ip"], owner=owner,
description=data["description"], pub_ipv4=models.
host = Host(hostname=data["hostname"],
mac=data["mac"], ipv4=data["ip"], owner=owner,
description=data["description"], pub_ipv4=
for p in data["portforward"]:
host.add_port(proto=p["proto"], public=int(p["public_port"]),
elif command == "destroy":
data["owner"] = "opennebula"
print data["hostname"]
owner = auth.models.User.objects.get(username=data["owner"])
host = models.Host.objects.get(hostname=data["hostname"],
host = Host.objects.get(hostname=data["hostname"],
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment