Commit 65ef48fc by Szabolcs Gelencser

Remove firewall.conf generating functionality. Implement Azure network

creation functionality. Fix celery version to make resultqueue work
with portal.
parent a2f97e1b
from celery import Celery, task
from os import getenv
import re
import json
import logging
from utils import (ns_exec, sudo, ADDRESSES, get_network_type,
dhcp_no_free_re, dhcp_ack_re, is_there_systemd)
from azure.common.credentials import ServicePrincipalCredentials
import azure.mgmt.network
DHCP_LOGFILE = getenv('DHCP_LOGFILE', '/var/log/syslog')
VLAN_CONF = getenv('VLAN_CONF', 'vlan.conf')
FIREWALL_CONF = getenv('FIREWALL_CONF', 'firewall.conf')
CACHE_URI = getenv('CACHE_URI')
AMQP_URI = getenv('AMQP_URI')
celery = Celery('tasks',)
celery.conf.update(CELERY_TASK_RESULT_EXPIRES=300,
SUBSCRIPTION_ID = getenv('SUBSCRIPTION_ID')
CLIENT_ID = getenv('CLIENT_ID')
SECRET = getenv('SECRET')
TENANT = getenv('TENANT')
GROUP_NAME = getenv('GROUP_NAME')
REGION = getenv('REGION')
subscription_id = SUBSCRIPTION_ID
credentials = ServicePrincipalCredentials(
client_id = CLIENT_ID,
secret = SECRET,
tenant = TENANT,
)
network_client = azure.mgmt.network.NetworkManagementClient(
credentials,
subscription_id
)
celery = Celery('fw',)
celery.conf.update(
BROKER_URL=AMQP_URI,
CELERY_CREATE_MISSING_QUEUES=True)
celery.conf.update(CELERY_CACHE_BACKEND=CACHE_URI,
CELERY_RESULT_BACKEND='cache')
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300,
)
logger = logging.getLogger(__name__)
@task(name="firewall.reload_firewall")
def reload_firewall(data4, data6, save_config=True):
try:
ns_exec(('ip6tables-restore', '-c'), data6)
ns_exec(('iptables-restore', '-c'), data4)
except:
logging.exception('Unhandled exception: ')
raise
if save_config:
with open(FIREWALL_CONF, 'w') as f:
json.dump([data4, data6], f)
logger.info("Firewall configuration is reloaded.")
@task(name="firewall.reload_firewall_vlan")
def reload_firewall_vlan(data, save_config=True):
network_type = get_network_type()
if network_type is None:
logger.info("Ignored reload_firewall_vlan() network type=%s",
network_type)
return
# Add additional addresses from config
for k, v in ADDRESSES.items():
data[k]['addresses'] += v
uplink = getenv('UPLINK', None)
if uplink:
data[uplink] = {'interfaces': uplink}
print network_type
br = network_type('firewall')
br.migrate(data)
if save_config:
with open(VLAN_CONF, 'w') as f:
json.dump(data, f)
@task(name="firewall.create_network")
def create_network(name, address_prefix):
logger.info("creating network: %s" % name)
# virtual network
poller = network_client.virtual_networks.create_or_update(
GROUP_NAME,
name,
azure.mgmt.network.models.VirtualNetwork(
location=REGION,
address_space=azure.mgmt.network.models.AddressSpace(
address_prefixes=[
address_prefix,
],
),
subnets=[
azure.mgmt.network.models.Subnet(
name="%s.subnet" % name,
address_prefix=address_prefix,
),
],
),
)
try:
ns_exec(('ip', 'ro', 'add', 'default', 'via',
getenv('GATEWAY', '152.66.243.254')))
except:
pass
logger.info("Interface (vlan) configuration is reloaded.")
@task(name="firewall.reload_dhcp")
def reload_dhcp(data):
with open('/etc/dhcp/dhcpd.conf.generated', 'w') as f:
f.write("\n".join(data) + "\n")
if is_there_systemd():
sudo(('/bin/systemctl', 'restart', 'dhcpd'))
else:
sudo(('/etc/init.d/isc-dhcp-server', 'restart'))
logger.info("DHCP configuration is reloaded.")
def ipset_save(data):
r = re.compile(r'^add blacklist ([0-9.]+)$')
data_new = [x['ipv4'] for x in data]
data_old = []
lines = ns_exec(('ipset', 'save', 'blacklist'))
for line in lines.splitlines():
x = r.match(line.rstrip())
if x:
data_old.append(x.group(1))
l_add = list(set(data_new).difference(set(data_old)))
l_del = list(set(data_old).difference(set(data_new)))
return (l_add, l_del)
def ipset_restore(l_add, l_del):
ipset = []
ipset.append('create blacklist hash:ip family inet hashsize '
'4096 maxelem 65536')
ipset += ['add blacklist %s' % x for x in l_add]
ipset += ['del blacklist %s' % x for x in l_del]
ns_exec(('ipset', 'restore', '-exist'),
'\n'.join(ipset) + '\n')
@task(name="firewall.reload_blacklist")
def reload_blacklist(data):
l_add, l_del = ipset_save(data)
ipset_restore(l_add, l_del)
logger.info("Blacklist configuration is reloaded.")
@task(name="firewall.get_dhcp_clients")
def get_dhcp_clients():
clients = {}
with open(DHCP_LOGFILE, 'r') as f:
for line in f:
m = dhcp_ack_re.search(line)
if m is None:
m = dhcp_no_free_re.search(line)
if m is None:
continue
m = m.groupdict()
mac = m['mac']
ip = m.get('ip', None)
hostname = m.get('hostname', None)
interface = m.get('interface', None)
clients[mac] = {'ip': ip, 'hostname': hostname,
'interface': interface}
return clients
def start_firewall():
try:
ns_exec(('ipset', 'create', 'blacklist', 'hash:ip',
'family', 'inet', 'hashsize', '4096', 'maxelem',
'65536'))
except:
pass
try:
with open(FIREWALL_CONF, 'r') as f:
data4, data6 = json.load(f)
reload_firewall(data4, data6, True)
except Exception:
logger.exception('Unhandled exception: ')
def start_networking():
try:
with open(VLAN_CONF, 'r') as f:
data = json.load(f)
reload_firewall_vlan(data, True)
except Exception:
logger.exception('Unhandled exception: ')
def main():
start_networking()
start_firewall()
main()
poller.wait()
logger.info("created network: %s" % name)
return poller.result().id
except Exception, e:
logger.error("cloud not create network: %s" % name)
logger.error(str(e))
return None
\ No newline at end of file
from netaddr import IPNetwork
from subprocess import CalledProcessError
import logging
from utils import NETNS, sudo, ns_exec, HA
logger = logging.getLogger(__name__)
class Interface(object):
def __init__(self, name, data, with_show=False):
# {"interfaces": ["eth1"], "tag": 2, "trunks": [1, 2, 3],
# "type": "internal", "addresses": ["193.006.003.130/24", "None"]}
self.name = name
self.is_internal = data.get('type', 'external') == 'internal'
try:
self.tagged = frozenset(int(i) for i in data['trunks'])
except (TypeError, KeyError):
self.tagged = frozenset()
untagged = data.get('tag')
if (untagged and not self.tagged and unicode(untagged).isdecimal()):
self.untagged = int(untagged)
else:
self.untagged = None
if with_show:
data['addresses'] = self.show()
try:
self.addresses = frozenset(IPNetwork(i) for i in data['addresses']
if i != 'None')
except (TypeError, KeyError):
self.addresses = frozenset()
def __repr__(self):
return '<Interface: %s veth=%s| untagged=%s tagged=%s addrs=%s>' % (
self.name, self.is_internal, self.untagged, self.tagged,
self.addresses)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __hash__(self):
return reduce(lambda acc, x: acc ^ hash(x),
self.__dict__.values(), 0)
@property
def external_name(self):
if self.is_internal:
return '%s-EXT' % self.name
else:
return self.name
def _run(self, *args):
args = ('ip', 'addr', ) + args
return ns_exec(args)
def show(self):
retval = []
try:
for line in self._run('show', self.name,
'scope', 'global').splitlines():
t = line.split()
if len(t) > 0 and t[0] in ('inet', 'inet6'):
retval.append(IPNetwork(t[1]))
except CalledProcessError:
pass
logger.debug('[ip-%s] show: %s', self.name, str(retval))
return retval
def delete_address(self, address):
self._run('del', str(address), 'dev', self.name)
def add_address(self, address):
self._run('add', str(address), 'dev', self.name)
def up(self):
if self.is_internal:
ns_exec(('ip', 'link', 'set', 'up', self.name))
sudo(('ip', 'link', 'set', 'up', self.external_name))
def migrate(self):
old_addresses = [str(x) for x in self.show()]
new_addresses = [str(x) for x in self.addresses]
to_delete = list(set(old_addresses) - set(new_addresses))
to_add = list(set(new_addresses) - set(old_addresses))
logger.debug('[ip-%s] delete: %s', self.name, str(to_delete))
logger.debug('[ip-%s] add: %s', self.name, str(to_add))
for i in to_delete:
self.delete_address(i)
for i in to_add:
self.add_address(i)
class Switch(object):
def __init__(self, brname):
self.brname = brname
try:
self._run('add-br', brname)
except:
pass
def _run(self, *args):
args = ('ovs-vsctl', ) + args
return sudo(args)
def _setns(self, dev):
args = ('ip', 'link', 'set', dev, 'netns', NETNS)
return sudo(args)
def list_ports(self):
ovs = {}
bridge = None
port = None
# parse ovs-vsctl show
for line in self._run('show').splitlines():
t = line.split()
if t[0] == 'Bridge':
bridge = t[1]
ovs[bridge] = {}
elif t[0] == 'Port':
port = t[1].replace('"', '') # valahol idezojel van
if port.endswith('-EXT'):
port = port.rsplit('-EXT')[0]
type = 'internal'
else:
type = 'external'
ovs[bridge][port] = {'type': type}
elif t[0] == 'tag:':
ovs[bridge][port]['tag'] = int(t[1])
elif t[0] == 'trunks:':
trunks = [int(p.strip('[,]')) for p in t[1:]]
ovs[bridge][port]['trunks'] = trunks
# Create Interface objects
return [Interface(name, data, with_show=True)
for name, data in ovs.get(self.brname, {}).items()
if name != self.brname]
def add_port(self, interface):
params = ['add-port', self.brname, interface.external_name]
if interface.untagged:
params.append('tag=%d' % int(interface.untagged))
if interface.tagged:
params.append('trunks=%s' % list(interface.tagged))
# move interface into namespace
try:
if interface.is_internal:
sudo(('ip', 'link', 'add', interface.external_name,
'type', 'veth', 'peer', 'name', interface.name))
self._setns(interface.name)
except:
logger.exception('Unhandled exception: ')
self._run(*params)
def delete_port(self, interface):
self._run('del-port', self.brname, interface.external_name)
if interface.is_internal:
try:
sudo(('ip', 'link', 'del', interface.external_name))
except CalledProcessError:
pass
def migrate(self, new_ports):
old_interfaces = self.list_ports()
new_interfaces = [Interface(port, data)
for port, data in new_ports.items()]
add = list(set(new_interfaces).difference(set(old_interfaces)))
delete = list(set(old_interfaces).difference(set(new_interfaces)))
logger.debug('[ovs delete]: %s', delete)
logger.debug('[ovs add]: %s', add)
for interface in delete:
self.delete_port(interface)
for interface in add:
self.add_port(interface)
for interface in new_interfaces:
try:
if interface.is_internal or not HA:
interface.up()
except CalledProcessError as e:
logger.warning(e)
try:
interface.migrate()
except CalledProcessError as e:
logger.warning(e)
class Bridge(Switch):
def __init__(self, brname):
self.brname = brname
self.brifnum = brname
try:
sudo(('brctl', 'addbr', brname))
sudo(('ip', 'link', 'set', 'up', brname))
except:
pass
def find_data(self, data, tok):
try:
masteridx = data.index(tok)
return tuple(data[masteridx + 1:])
except (ValueError, IndexError):
return (None, )
def parse_ip_link(self, data):
port = None
ports = {}
for line in data.splitlines():
t = line.split()
if line.startswith(' '):
vlan = self.find_data(t, '802.1Q')
if port in ports and vlan and vlan[0] == 'id':
ports[port]['tag'] = vlan[1]
else:
port, sep, parent = t[1].rstrip(':').partition('@')
if self.find_data(t, 'master')[0] == self.brname:
type = 'external'
elif (parent in (self.brname, self.brifnum) or
port == self.brname):
type = 'internal'
else:
continue
ports[port] = {'type': type, 'ifnum': t[0].rstrip(':')}
return ports
def list_ports(self):
ports = self.parse_ip_link(sudo(('ip', '-d', 'link', 'show')))
brport = ports.pop(self.brname)
self.brifnum = 'if%s' % brport['ifnum']
ports.update(self.parse_ip_link(ns_exec(('ip', '-d', 'link', 'show'))))
return [Interface(name, data, with_show=True)
for name, data in ports.items()]
def delete_port(self, interface):
try:
if interface.is_internal:
ns_exec(('ip', 'link', 'del', interface.name))
else:
sudo(('brctl', 'delif', self.brname, interface.name))
except CalledProcessError:
pass
def add_port(self, interface):
try:
if interface.is_internal:
if not interface.untagged:
return
sudo(('ip', 'link', 'add', 'link', self.brname, 'name',
interface.name, 'type', 'vlan', 'id',
str(interface.untagged)))
self._setns(interface.name)
else:
sudo(('brctl', 'addif', self.brname, interface.name))
except:
logger.exception('Unhandled exception: ')
if __name__ == "__main__":
br = Bridge('br0')
print br.list_ports()
......@@ -3,7 +3,7 @@ amqp==1.0.13
anyjson==0.3.3
argparse==1.2.1
billiard==2.7.3.32
celery==3.0.23
celery==3.1.18
distribute==0.7.3
kombu==2.5.14
netaddr==0.7.10
......
import os
from os import getenv
import subprocess as sp
import logging
import json
import re
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
NETNS = getenv('NETNS', 'fw')
ADDRESSES = json.loads(getenv('ADDRESSES', '{}'))
HA = bool(getenv('HA', False))
def get_network_type():
from ovs import Switch, Bridge
if getenv('BRIDGE_TYPE', 'OVS') == 'BRIDGE':
return Bridge
elif getenv('BRIDGE_TYPE', 'OVS') == 'NONE':
return None
else:
return Switch
# 2013-06-26 12:16:59 DHCPACK on 10.4.0.14 to 5c:b5:24:e6:5c:81
# (android_b555bfdba7c837d) via vlan0004
dhcp_ack_re = re.compile(r'\S DHCPACK on (?P<ip>[0-9.]+) to '
r'(?P<mac>[a-zA-Z0-9:]+) '
r'(\((?P<hostname>[^)]+)\) )?'
r'via (?P<interface>[a-zA-Z0-9]+)')
# 2013-06-25 11:08:38 DHCPDISCOVER from 48:5b:39:8e:82:78
# via vlan0005: network 10.5.0.0/16: no free leases
dhcp_no_free_re = re.compile(r'\S DHCPDISCOVER '
r'from (?P<mac>[a-zA-Z0-9:]+) '
r'via (?P<interface>[a-zA-Z0-9]+):')
def sudo(args, stdin=None):
args = ('/usr/bin/sudo', ) + args
logger.debug('EXEC {}'.format(' '.join(args)))
p = sp.Popen(args, stdin=sp.PIPE, stderr=sp.PIPE, stdout=sp.PIPE)
if isinstance(stdin, basestring):
stdout, stderr = p.communicate(stdin)
else:
stdout, stderr = p.communicate()
if p.returncode != 0:
raise sp.CalledProcessError(
p.returncode, sp.list2cmdline(args), stderr)
return stdout
def ns_exec(args, stdin=None):
if get_network_type() is None:
return sudo(args, stdin)
else:
return sudo(('/sbin/ip', 'netns', 'exec',
NETNS) + args, stdin)
def is_there_systemd():
return os.path.isfile("/bin/systemctl")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment