iptables.py 3.34 KB
Newer Older
Bach Dániel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import logging
import re
from collections import OrderedDict

logger = logging.getLogger()

ipv4_re = re.compile(
    r'^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}')


class InvalidRuleExcepion(Exception):
    pass


class IptRule(object):

    def __init__(self, priority=1000, action=None, src=None, dst=None,
18
                 proto=None, sport=None, dport=None, extra=None,
19
                 ipv4_only=False, comment=None):
Bach Dániel committed
20 21 22 23 24 25 26 27 28 29 30 31
        if proto not in ['tcp', 'udp', 'icmp', None]:
            raise InvalidRuleExcepion()
        if proto not in ['tcp', 'udp'] and (sport is not None or
                                            dport is not None):
            raise InvalidRuleExcepion()

        self.priority = int(priority)
        self.action = action

        (self.src4, self.src6) = (None, None)
        if isinstance(src, tuple):
            (self.src4, self.src6) = src
32 33
            if not self.src6:
                ipv4_only = True
Bach Dániel committed
34 35 36
        (self.dst4, self.dst6) = (None, None)
        if isinstance(dst, tuple):
            (self.dst4, self.dst6) = dst
37 38
            if not self.dst6:
                ipv4_only = True
Bach Dániel committed
39 40 41 42 43 44

        self.proto = proto
        self.sport = sport
        self.dport = dport

        self.extra = extra
45 46
        self.ipv4_only = (ipv4_only or
                          extra is not None and bool(ipv4_re.search(extra)))
47
        self.comment = comment
Bach Dániel committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70

    def __hash__(self):
        return hash(frozenset(self.__dict__.items()))

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    def __lt__(self, other):
        return self.priority < other.priority

    def __repr__(self):
        return '<IptRule: @%d %s >' % (self.priority, self.compile())

    def __unicode__(self):
        return self.__repr__()

    def compile(self, proto='ipv4'):
        opts = OrderedDict([('src4' if proto == 'ipv4' else 'src6', '-s %s'),
                            ('dst4' if proto == 'ipv4' else 'dst6', '-d %s'),
                            ('proto', '-p %s'),
                            ('sport', '--sport %s'),
                            ('dport', '--dport %s'),
                            ('extra', '%s'),
71
                            ('comment', '-m comment --comment "%s"'),
Bach Dániel committed
72 73 74 75 76 77 78 79
                            ('action', '-g %s')])
        params = [opts[param] % getattr(self, param)
                  for param in opts
                  if getattr(self, param) is not None]
        return ' '.join(params)


class IptChain(object):
80 81
    nat_chains = ('PREROUTING', 'POSTROUTING')
    builtin_chains = ('FORWARD', 'INPUT', 'OUTPUT') + nat_chains
Bach Dániel committed
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108

    def __init__(self, name):
        self.rules = set()
        self.name = name

    def add(self, *args, **kwargs):
        for rule in args:
            self.rules.add(rule)

    def sort(self):
        return sorted(list(self.rules))

    def __len__(self):
        return len(self.rules)

    def __repr__(self):
        return '<IptChain: %s %s>' % (self.name, self.rules)

    def __unicode__(self):
        return self.__repr__()

    def compile(self, proto='ipv4'):
        assert proto in ('ipv4', 'ipv6')
        prefix = '-A %s ' % self.name
        return '\n'.join([prefix + rule.compile(proto)
                          for rule in self.sort()
                          if not (proto == 'ipv6' and rule.ipv4_only)])
109 110 111

    def compile_v6(self):
        return self.compile('ipv6')