Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
Fukász Rómeó Ervin
/
cloud
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Members
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
A prog2-höz tartozó friss repo anyagok itt elérhetőek:
https://git.iit.bme.hu/
Commit
76a93346
authored
Jun 18, 2013
by
Őry Máté
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
firewall: refactor complex methods
parent
9fa75044
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
104 additions
and
62 deletions
+104
-62
firewall/fields.py
+11
-0
firewall/models.py
+93
-62
No files found.
firewall/fields.py
View file @
76a93346
...
...
@@ -73,6 +73,17 @@ def val_ipv6(value):
if
not
is_valid_ipv6_address
(
value
):
raise
ValidationError
(
_
(
u'
%
s - not an IPv6 address'
)
%
value
)
def
val_mx
(
value
):
"""Validate whether the parameter is a valid MX address definition.
Expected form is <priority>:<hostname>.
"""
mx
=
self
.
address
.
split
(
':'
,
1
)
if
not
(
len
(
mx
)
==
2
and
mx
[
0
]
.
isdigit
()
and
domain_re
.
match
(
mx
[
1
])):
raise
ValidationError
(
_
(
"Bad MX address format. "
"Should be: <priority>:<hostname>"
))
def
ipv4_2_ipv6
(
ipv4
):
"""Convert IPv4 address string to IPv6 address string."""
val_ipv4
(
ipv4
)
...
...
firewall/models.py
View file @
76a93346
...
...
@@ -244,44 +244,67 @@ class Host(models.Model):
def
enable_net
(
self
):
self
.
groups
.
add
(
Group
.
objects
.
get
(
name
=
"netezhet"
))
def
add_port
(
self
,
proto
,
public
=
None
,
private
=
None
):
proto
=
"tcp"
if
proto
==
"tcp"
else
"udp"
def
_get_ports_used
(
self
,
proto
):
"""
Gives a list of port numbers used for the public IP address of current
host for the given protocol.
:param proto: The transport protocol of the generated port (tcp|udp).
:type proto: str.
:returns: list -- list of int port numbers used.
"""
if
self
.
shared_ip
:
used_ports
=
Rule
.
objects
.
filter
(
host__pub_ipv4
=
self
.
pub_ipv4
,
nat
=
True
,
proto
=
proto
)
.
values_list
(
'dport'
,
flat
=
True
)
if
public
is
None
:
public
=
random
.
randint
(
1024
,
21000
)
if
public
in
used_ports
:
for
i
in
range
(
1024
,
21000
)
+
range
(
24000
,
65535
):
if
i
not
in
used_ports
:
public
=
i
break
else
:
raise
ValidationError
(
_
(
"Port
%
s
%
s is already in use."
)
%
(
proto
,
public
))
ports
=
Rule
.
objects
.
filter
(
host__pub_ipv4
=
self
.
pub_ipv4
,
nat
=
True
,
proto
=
proto
)
else
:
ports
=
self
.
rules
.
filter
(
proto
=
proto
,
)
return
ports
.
values_list
(
'dport'
,
flat
=
True
)
def
_get_random_port
(
self
,
proto
,
used_ports
=
None
):
"""
Get a random unused port for given protocol for current host's public
IP address.
:param proto: The transport protocol of the generated port (tcp|udp).
:type proto: str.
:param used_ports: Optional list of used ports returned by
_get_ports_used.
:returns: int -- the generated port number.
:raises: ValidationError
"""
if
used_ports
is
None
:
used_ports
=
self
.
_get_ports_used
(
proto
)
public
=
random
.
randint
(
1024
,
21000
)
# pick a random port
if
public
in
used_ports
:
# if it's in use, select smallest free one
for
i
in
range
(
1024
,
21000
)
+
range
(
24000
,
65535
):
if
i
not
in
used_ports
:
public
=
i
break
else
:
if
public
<
1024
:
raise
ValidationError
(
_
(
"Only ports above 1024 can be used."
))
if
public
in
used_ports
:
raise
ValidationError
(
_
(
"Port
%
s
%
s is already in use."
)
%
(
proto
,
public
))
vg
=
VlanGroup
.
objects
.
get
(
name
=
settings
[
"default_vlangroup"
])
raise
ValidationError
(
_
(
"All
%
s ports are already in use."
)
%
proto
)
def
add_port
(
self
,
proto
,
public
=
None
,
private
=
None
):
assert
proto
in
(
'tcp'
,
'udp'
,
)
if
public
:
if
public
in
self
.
_get_ports_used
(
proto
):
raise
ValidationError
(
_
(
"Port
%
s
%
s is already in use."
)
%
(
proto
,
public
))
else
:
public
=
self
.
_get_random_port
(
proto
)
vg
=
VlanGroup
.
objects
.
get
(
name
=
settings
[
"default_vlangroup"
])
if
self
.
shared_ip
:
if
public
<
1024
:
raise
ValidationError
(
_
(
"Only ports above 1024 can be used."
))
rule
=
Rule
(
direction
=
'1'
,
owner
=
self
.
owner
,
dport
=
public
,
proto
=
proto
,
nat
=
True
,
accept
=
True
,
r_type
=
"host"
,
nat_dport
=
private
,
host
=
self
,
foreign_network
=
vg
)
else
:
if
self
.
rules
.
filter
(
proto
=
proto
,
dport
=
public
):
raise
ValidationError
(
_
(
"Port
%
s
%
s is already in use."
)
%
(
proto
,
public
))
rule
=
Rule
(
direction
=
'1'
,
owner
=
self
.
owner
,
dport
=
public
,
proto
=
proto
,
nat
=
False
,
accept
=
True
,
r_type
=
"host"
,
host
=
self
,
foreign_network
=
VlanGroup
.
objects
.
get
(
name
=
settings
[
"default_vlangroup"
]))
host
=
self
,
foreign_network
=
vg
)
rule
.
full_clean
()
rule
.
save
()
...
...
@@ -389,43 +412,51 @@ class Record(models.Model):
self
.
full_clean
()
super
(
Record
,
self
)
.
save
(
*
args
,
**
kwargs
)
def
_validate_w_host
(
self
):
"""Validate a record with host set."""
assert
self
.
host
if
self
.
type
in
[
'A'
,
'AAAA'
]:
if
self
.
address
:
raise
ValidationError
(
_
(
"Can't specify address for A "
"or AAAA records if host is set!"
))
if
self
.
name
:
raise
ValidationError
(
_
(
"Can't specify name for A "
"or AAAA records if host is set!"
))
elif
self
.
type
==
'CNAME'
:
if
not
self
.
name
:
raise
ValidationError
(
_
(
"Name must be specified for "
"CNAME records if host is set!"
))
if
self
.
address
:
raise
ValidationError
(
_
(
"Can't specify address for "
"CNAME records if host is set!"
))
def
_validate_wo_host
(
self
):
"""Validate a record without a host set."""
assert
self
.
host
is
None
if
not
self
.
address
:
raise
ValidationError
(
_
(
"Address must be specified!"
))
if
self
.
type
==
'A'
:
val_ipv4
(
self
.
address
)
elif
self
.
type
==
'AAAA'
:
val_ipv6
(
self
.
address
)
elif
self
.
type
in
[
'CNAME'
,
'NS'
,
'PTR'
,
'TXT'
]:
val_domain
(
self
.
address
)
elif
self
.
type
==
'MX'
:
val_mx
(
self
.
address
)
else
:
raise
ValidationError
(
_
(
"Unknown record type."
))
def
clean
(
self
):
"""Validate the Record to be saved.
"""
if
self
.
name
:
self
.
name
=
self
.
name
.
rstrip
(
"."
)
# remove trailing dots
if
self
.
host
:
if
self
.
type
in
[
'A'
,
'AAAA'
]:
if
self
.
address
:
raise
ValidationError
(
_
(
"Can't specify address for A "
"or AAAA records if host is set!"
))
if
self
.
name
:
raise
ValidationError
(
_
(
"Can't specify name for A "
"or AAAA records if host is set!"
))
elif
self
.
type
==
'CNAME'
:
if
not
self
.
name
:
raise
ValidationError
(
_
(
"Name must be specified for "
"CNAME records if host is set!"
))
if
self
.
address
:
raise
ValidationError
(
_
(
"Can't specify address for "
"CNAME records if host is set!"
))
else
:
# if self.host is None
if
not
self
.
address
:
raise
ValidationError
(
_
(
"Address must be specified!"
))
if
self
.
type
==
'A'
:
val_ipv4
(
self
.
address
)
elif
self
.
type
==
'AAAA'
:
val_ipv6
(
self
.
address
)
elif
self
.
type
in
[
'CNAME'
,
'NS'
,
'PTR'
,
'TXT'
]:
val_domain
(
self
.
address
)
elif
self
.
type
==
'MX'
:
mx
=
self
.
address
.
split
(
':'
,
1
)
if
not
(
len
(
mx
)
==
2
and
mx
[
0
]
.
isdigit
()
and
domain_re
.
match
(
mx
[
1
])):
raise
ValidationError
(
_
(
"Bad MX address format. "
"Should be: <priority>:<name>"
))
else
:
raise
ValidationError
(
_
(
"Unknown record type."
))
self
.
_validate_w_host
()
else
:
self
.
_validate_wo_host
()
def
__get_name
(
self
):
if
self
.
host
and
self
.
type
!=
'MX'
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment