Commit de1ac429 by Bach Dániel

firewall: rewrite ip allocation policy

fixes #88
parent 15cff265
# -*- coding: utf-8 -*-
from itertools import islice, chain
from itertools import islice, ifilter
import logging
from netaddr import IPSet, EUI
......@@ -298,19 +298,12 @@ class Vlan(AclBase, models.Model):
def prefix6(self):
return self.network6.prefixlen
def get_next_address(self, used_v4):
try:
last_address = list(used_v4)[-1]
except IndexError:
return []
next_address = last_address + 1
if next_address in self.network4.iter_hosts():
logger.debug("Found unused IPv4 address %s after %s.",
next_address, last_address)
return [next_address]
else:
return []
def get_random_addresses(self, used_v4, buffer_size=100, max_hosts=10000):
addresses = islice(self.network4.iter_hosts(), max_hosts)
unused_addresses = list(islice(
ifilter(lambda x: x not in used_v4, addresses), buffer_size))
random.shuffle(unused_addresses)
return unused_addresses
def get_new_address(self):
hosts = self.host_set
......@@ -318,14 +311,11 @@ class Vlan(AclBase, models.Model):
used_v6 = IPSet(hosts.exclude(ipv6__isnull=True)
.values_list('ipv6', flat=True))
for ipv4 in chain(self.get_next_address(used_v4),
islice(self.network4.iter_hosts(), 10000)):
ipv4 = str(ipv4)
if ipv4 not in used_v4:
for ipv4 in self.get_random_addresses(used_v4):
logger.debug("Found unused IPv4 address %s.", ipv4)
ipv6 = None
if self.network6 is not None:
ipv6 = ipv4_2_ipv6(self.ipv6_template, ipv4)
ipv6 = convert_ipv4_to_ipv6(self.ipv6_template, ipv4)
if ipv6 in used_v6:
continue
else:
......
from netaddr import IPSet
from django.test import TestCase
from django.contrib.auth.models import User
from ..admin import HostAdmin
......@@ -75,13 +77,9 @@ class GetNewAddressTestCase(TestCase):
vlan=self.vlan, owner=self.u1).save()
self.assertRaises(ValidationError, self.vlan.get_new_address)
def test_new_addr_last(self):
self.assertEqual(self.vlan.get_new_address()['ipv4'], '10.0.0.6')
def test_new_addr_w_overflow(self):
Host(hostname='h-6', mac='01:02:03:04:05:06',
ipv4='10.0.0.6', vlan=self.vlan, owner=self.u1).save()
self.assertEqual(self.vlan.get_new_address()['ipv4'], '10.0.0.2')
def test_new_addr(self):
used_v4 = IPSet(self.vlan.host_set.values_list('ipv4', flat=True))
assert self.vlan.get_new_address()['ipv4'] not in used_v4
class HostGetHostnameTestCase(TestCase):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment