Commit eed8061b by Bach Dániel

Merge branch 'issue-88' into 'master'

Write new IP allocation policy #88
parents a586fa25 df2e6481
......@@ -261,11 +261,10 @@ def val_mx(value):
"Should be: <priority>:<hostname>"))
def ipv4_2_ipv6(ipv6_template, ipv4):
def convert_ipv4_to_ipv6(ipv6_template, ipv4):
"""Convert IPv4 address string to IPv6 address string."""
val_ipv4(ipv4)
m = ipv4_re.match(ipv4)
return (ipv6_template % {'a': int(m.group(1)),
'b': int(m.group(2)),
'c': int(m.group(3)),
'd': int(m.group(4))})
m = ipv4.words
return IPAddress(ipv6_template % {'a': int(m[0]),
'b': int(m[1]),
'c': int(m[2]),
'd': int(m[3])})
# -*- coding: utf-8 -*-
from itertools import islice, chain
from itertools import islice, ifilter
import logging
from netaddr import IPSet, EUI
......@@ -9,9 +9,9 @@ from django.db import models
from django.forms import ValidationError
from django.utils.translation import ugettext_lazy as _
from firewall.fields import (MACAddressField, val_alfanum, val_reverse_domain,
val_ipv6_template,
val_domain, val_ipv4, val_ipv6, val_mx,
ipv4_2_ipv6, IPNetworkField, IPAddressField)
val_ipv6_template, val_domain, val_ipv4,
val_ipv6, val_mx, convert_ipv4_to_ipv6,
IPNetworkField, IPAddressField)
from django.core.validators import MinValueValidator, MaxValueValidator
import django.conf
from django.db.models.signals import post_save, post_delete
......@@ -298,19 +298,12 @@ class Vlan(AclBase, models.Model):
def prefix6(self):
return self.network6.prefixlen
def get_next_address(self, used_v4):
try:
last_address = list(used_v4)[-1]
except IndexError:
return []
next_address = last_address + 1
if next_address in self.network4.iter_hosts():
logger.debug("Found unused IPv4 address %s after %s.",
next_address, last_address)
return [next_address]
else:
return []
def get_random_addresses(self, used_v4, buffer_size=100, max_hosts=10000):
addresses = islice(self.network4.iter_hosts(), max_hosts)
unused_addresses = list(islice(
ifilter(lambda x: x not in used_v4, addresses), buffer_size))
random.shuffle(unused_addresses)
return unused_addresses
def get_new_address(self):
hosts = self.host_set
......@@ -318,14 +311,11 @@ class Vlan(AclBase, models.Model):
used_v6 = IPSet(hosts.exclude(ipv6__isnull=True)
.values_list('ipv6', flat=True))
for ipv4 in chain(self.get_next_address(used_v4),
islice(self.network4.iter_hosts(), 10000)):
ipv4 = str(ipv4)
if ipv4 not in used_v4:
for ipv4 in self.get_random_addresses(used_v4):
logger.debug("Found unused IPv4 address %s.", ipv4)
ipv6 = None
if self.network6 is not None:
ipv6 = ipv4_2_ipv6(self.ipv6_template, ipv4)
ipv6 = convert_ipv4_to_ipv6(self.ipv6_template, ipv4)
if ipv6 in used_v6:
continue
else:
......@@ -473,7 +463,8 @@ class Host(models.Model):
def save(self, *args, **kwargs):
if not self.id and self.ipv6 == "auto":
self.ipv6 = ipv4_2_ipv6(self.vlan.ipv6_template, self.ipv4)
self.ipv6 = convert_ipv4_to_ipv6(self.vlan.ipv6_template,
self.ipv4)
self.full_clean()
super(Host, self).save(*args, **kwargs)
......
from netaddr import IPSet
from django.test import TestCase
from django.contrib.auth.models import User
from ..admin import HostAdmin
......@@ -75,13 +77,9 @@ class GetNewAddressTestCase(TestCase):
vlan=self.vlan, owner=self.u1).save()
self.assertRaises(ValidationError, self.vlan.get_new_address)
def test_new_addr_last(self):
self.assertEqual(self.vlan.get_new_address()['ipv4'], '10.0.0.6')
def test_new_addr_w_overflow(self):
Host(hostname='h-6', mac='01:02:03:04:05:06',
ipv4='10.0.0.6', vlan=self.vlan, owner=self.u1).save()
self.assertEqual(self.vlan.get_new_address()['ipv4'], '10.0.0.2')
def test_new_addr(self):
used_v4 = IPSet(self.vlan.host_set.values_list('ipv4', flat=True))
assert self.vlan.get_new_address()['ipv4'] not in used_v4
class HostGetHostnameTestCase(TestCase):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment