iptables.py 3.35 KB
Newer Older
Bach Dániel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import logging
import re
from collections import OrderedDict

logger = logging.getLogger()

ipv4_re = re.compile(
    r'^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}')


class InvalidRuleExcepion(Exception):
    pass


class IptRule(object):

    def __init__(self, priority=1000, action=None, src=None, dst=None,
18
                 proto=None, sport=None, dport=None, extra=None,
19
                 ipv4_only=False, comment=None):
Bach Dániel committed
20 21 22 23 24 25 26 27 28 29 30 31
        if proto not in ['tcp', 'udp', 'icmp', None]:
            raise InvalidRuleExcepion()
        if proto not in ['tcp', 'udp'] and (sport is not None or
                                            dport is not None):
            raise InvalidRuleExcepion()

        self.priority = int(priority)
        self.action = action

        (self.src4, self.src6) = (None, None)
        if isinstance(src, tuple):
            (self.src4, self.src6) = src
32 33
            if not self.src6:
                ipv4_only = True
Bach Dániel committed
34 35 36
        (self.dst4, self.dst6) = (None, None)
        if isinstance(dst, tuple):
            (self.dst4, self.dst6) = dst
37 38
            if not self.dst6:
                ipv4_only = True
Bach Dániel committed
39 40 41 42 43 44

        self.proto = proto
        self.sport = sport
        self.dport = dport

        self.extra = extra
45 46
        self.ipv4_only = (ipv4_only or
                          extra is not None and bool(ipv4_re.search(extra)))
47
        self.comment = comment
Bach Dániel committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70

    def __hash__(self):
        return hash(frozenset(self.__dict__.items()))

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    def __lt__(self, other):
        return self.priority < other.priority

    def __repr__(self):
        return '<IptRule: @%d %s >' % (self.priority, self.compile())

    def __unicode__(self):
        return self.__repr__()

    def compile(self, proto='ipv4'):
        opts = OrderedDict([('src4' if proto == 'ipv4' else 'src6', '-s %s'),
                            ('dst4' if proto == 'ipv4' else 'dst6', '-d %s'),
                            ('proto', '-p %s'),
                            ('sport', '--sport %s'),
                            ('dport', '--dport %s'),
                            ('extra', '%s'),
71
                            ('comment', '-m comment --comment "%s"'),
Bach Dániel committed
72 73 74 75 76 77 78 79
                            ('action', '-g %s')])
        params = [opts[param] % getattr(self, param)
                  for param in opts
                  if getattr(self, param) is not None]
        return ' '.join(params)


class IptChain(object):
80 81
    nat_chains = ('PREROUTING', 'POSTROUTING')
    builtin_chains = ('FORWARD', 'INPUT', 'OUTPUT') + nat_chains
Bach Dániel committed
82 83 84 85 86 87 88 89 90 91

    def __init__(self, name):
        self.rules = set()
        self.name = name

    def add(self, *args, **kwargs):
        for rule in args:
            self.rules.add(rule)

    def sort(self):
Bach Dániel committed
92
        return sorted(list(self.rules), reverse=True)
Bach Dániel committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108

    def __len__(self):
        return len(self.rules)

    def __repr__(self):
        return '<IptChain: %s %s>' % (self.name, self.rules)

    def __unicode__(self):
        return self.__repr__()

    def compile(self, proto='ipv4'):
        assert proto in ('ipv4', 'ipv6')
        prefix = '-A %s ' % self.name
        return '\n'.join([prefix + rule.compile(proto)
                          for rule in self.sort()
                          if not (proto == 'ipv6' and rule.ipv4_only)])
109 110 111

    def compile_v6(self):
        return self.compile('ipv6')