Commit dfb15f98 by Őry Máté

Merge remote-tracking branch 'origin/acl'

parents be07eae3 6b43a7c3
......@@ -18,6 +18,6 @@ _build
# Logs:
*.log
.ropeproject
#celery
celerybeat-schedule
.coverage
*,cover
"""
Creates Levels for all installed apps that have levels.
"""
from django.db.models import get_models, signals
from django.db import DEFAULT_DB_ALIAS
from django.core.exceptions import ImproperlyConfigured
from ..models import Level, AclBase
def create_levels(app, created_models, verbosity, db=DEFAULT_DB_ALIAS,
**kwargs):
"""Create and set the weights of the configured Levels.
Based on django.contrib.auth.management.__init__.create_permissions"""
# if not router.allow_migrate(db, auth_app.Permission):
# return
from django.contrib.contenttypes.models import ContentType
app_models = [k for k in get_models(app) if AclBase in k.__bases__]
print "Creating levels for models: %s." % ", ".join([m.__name__ for m in app_models])
# This will hold the levels we're looking for as
# (content_type, (codename, name))
searched_levels = list()
level_weights = list()
# The codenames and ctypes that should exist.
ctypes = set()
for klass in app_models:
# Force looking up the content types in the current database
# before creating foreign keys to them.
ctype = ContentType.objects.db_manager(db).get_for_model(klass)
ctypes.add(ctype)
weight = 0
try:
for codename, name in klass.ACL_LEVELS:
searched_levels.append((ctype, (codename, name)))
level_weights.append((ctype, codename, weight))
weight += 1
except AttributeError:
raise ImproperlyConfigured(
"Class %s doesn't have ACL_LEVELS attribute." % klass)
# Find all the Levels that have a content_type for a model we're
# looking for. We don't need to check for codenames since we already have
# a list of the ones we're going to create.
all_levels = set(Level.objects.using(db).filter(
content_type__in=ctypes,
).values_list(
"content_type", "codename"
))
levels = [
Level(codename=codename, name=name, content_type=ctype)
for ctype, (codename, name) in searched_levels
if (ctype.pk, codename) not in all_levels
]
Level.objects.using(db).bulk_create(levels)
if verbosity >= 2:
print("Adding levels [%s]." % ", ".join(levels))
print("Searched: [%s]." % ", ".join([unicode(l) for l in searched_levels]))
print("All: [%s]." % ", ".join([unicode(l) for l in all_levels]))
# set weights
for ctype, codename, weight in level_weights:
Level.objects.filter(codename=codename,
content_type=ctype).update(weight=weight)
signals.post_syncdb.connect(
create_levels, dispatch_uid="circle.acl.management.create_levels")
from django.core.management.base import BaseCommand
from .. import create_levels
class Command(BaseCommand):
args = ''
help = 'Regenerates Levels'
def handle(self, *args, **options):
create_levels(None, None, 3)
import logging
from django.contrib.auth.models import User, Group
from django.contrib.contenttypes.generic import (
GenericForeignKey, GenericRelation
)
from django.contrib.contenttypes.models import ContentType
from django.db.models import (
ManyToManyField, ForeignKey, CharField, Model, IntegerField
)
logger = logging.getLogger(__name__)
class Level(Model):
"""Definition of a permission level.
Instances are automatically populated based on AclBase."""
name = CharField('name', max_length=50)
content_type = ForeignKey(ContentType)
codename = CharField('codename', max_length=100)
weight = IntegerField('weight', null=True)
def __unicode__(self):
return "<%s/%s>" % (unicode(self.content_type), self.name)
class Meta:
unique_together = (('content_type', 'codename'),
# ('content_type', 'weight'),
# TODO find a way of temp. disabling this constr.
)
class ObjectLevel(Model):
"""Permission level for a specific object."""
level = ForeignKey(Level)
content_type = ForeignKey(ContentType)
object_id = CharField(max_length=255)
content_object = GenericForeignKey()
users = ManyToManyField(User)
groups = ManyToManyField(Group)
def __unicode__(self):
return "<%s: %s>" % (unicode(self.content_object), unicode(self.level))
class Meta:
unique_together = (('content_type', 'object_id', 'level'),)
class AclBase(Model):
"""Define permission levels for Users/Groups per object."""
object_level_set = GenericRelation(ObjectLevel)
@classmethod
def get_level_object(cls, level):
"""Get Level object for this model by codename."""
ct = ContentType.objects.get_for_model(cls)
return Level.objects.get(codename=level, content_type=ct)
def set_level(self, whom, level):
"""Set level of object for a user or group.
:param whom: user or group the level is set for
:type whom: User or Group
:param level: codename of level to set
:type level: Level or str or unicode
"""
if isinstance(whom, User):
self.set_user_level(whom, level)
elif isinstance(whom, Group):
self.set_group_level(whom, level)
else:
raise AttributeError('"whom" must be a User or Group object.')
def set_user_level(self, user, level):
"""Set level of object for a user.
:param whom: user the level is set for
:type whom: User
:param level: codename of level to set
:type level: Level or str or unicode
"""
logger.info('%s.set_user_level(%s, %s) called',
*[unicode(p) for p in [self, user, level]])
if isinstance(level, basestring):
level = self.get_level_object(level)
if not self.object_level_set.filter(level_id=level.pk).exists():
self.object_level_set.create(level=level)
for i in self.object_level_set.all():
if i.level_id != level.pk:
i.users.remove(user)
else:
i.users.add(user)
i.save()
def set_group_level(self, group, level):
"""Set level of object for a user.
:param whom: user the level is set for
:type whom: User or unicode or str
:param level: codename of level to set
:type level: str or unicode
"""
logger.info('%s.set_group_level(%s, %s) called',
*[unicode(p) for p in [self, group, level]])
if isinstance(level, basestring):
level = self.get_level_object(level)
#self.object_level_set.get_or_create(level=level, content_object=self)
if not self.object_level_set.filter(level_id=level.pk).exists():
self.object_level_set.create(level=level)
for i in self.object_level_set.all():
if i.level_id != level.pk:
i.groups.remove(group)
else:
i.groups.add(group)
i.save()
def has_level(self, user, level, group_also=True):
logger.debug('%s.has_level(%s, %s, %s) called',
*[unicode(p) for p in [self, user, level, group_also]])
if getattr(user, 'is_superuser', False):
logger.debug('- superuser granted')
return True
if isinstance(level, basestring):
level = self.get_level_object(level)
logger.debug("- level set by str: %s", unicode(level))
object_levels = self.object_level_set.filter(
level__weight__gte=level.weight).all()
groups = user.groups.values_list('id', flat=True) if group_also else []
for i in object_levels:
if i.users.filter(pk=user.pk).exists():
return True
if group_also and i.groups.filter(pk__in=groups).exists():
return True
return False
def get_users_with_level(self):
logger.debug('%s.get_users_with_level() called', unicode(self))
object_levels = (self.object_level_set.select_related(
'users', 'level').all())
users = []
for object_level in object_levels:
name = object_level.level.codename
olusers = object_level.users.all()
users.extend([(u, name) for u in olusers])
logger.debug('- %s: %s' % (name, [u.username for u in olusers]))
return users
def get_groups_with_level(self):
logger.debug('%s.get_groups_with_level() called', unicode(self))
object_levels = (self.object_level_set.select_related(
'groups', 'level').all())
groups = []
for object_level in object_levels:
name = object_level.level.codename
olgroups = object_level.groups.all()
groups.extend([(g, name) for g in olgroups])
logger.debug('- %s: %s' % (name, [g.name for g in olgroups]))
return groups
class Meta:
abstract = True
from django.db.models import TextField
from ..models import AclBase
class TestModel(AclBase):
normal_field = TextField()
ACL_LEVELS = (
('alfa', 'Alfa'),
('bravo', 'Bravo'),
('charlie', 'Charlie'),
)
class Test2Model(AclBase):
normal2_field = TextField()
ACL_LEVELS = (
('one', 'One'),
('two', 'Two'),
('three', 'Three'),
)
from django.test import TestCase
from django.contrib.auth.models import User, Group, AnonymousUser
from ..models import ObjectLevel
from .models import TestModel, Test2Model
class AclUserTest(TestCase):
def setUp(self):
self.u1 = User.objects.create(username='user1')
self.u2 = User.objects.create(username='user2', is_staff=True)
self.us = User.objects.create(username='superuser', is_superuser=True)
self.g1 = Group.objects.create(name='group1')
self.g1.user_set.add(self.u1)
self.g1.user_set.add(self.u2)
self.g1.save()
def test_level_exists(self):
for codename, name in TestModel.ACL_LEVELS:
level = TestModel.get_level_object(codename)
self.assertEqual(level.codename, codename)
for codename, name in Test2Model.ACL_LEVELS:
level = Test2Model.get_level_object(codename)
self.assertEqual(level.codename, codename)
def test_lowest_user_level(self):
i = TestModel.objects.create(normal_field='Hello')
self.assertFalse(i.has_level(self.u1, 'alfa', False))
self.assertFalse(i.has_level(self.u1, 'bravo', False))
i.set_level(self.u1, 'alfa')
i.set_level(self.g1, 'bravo')
self.assertTrue(i.has_level(self.u1, 'alfa', False))
self.assertFalse(i.has_level(self.u1, 'bravo', False))
def test_anonymous_user_level(self):
i = TestModel.objects.create(normal_field='Hello')
anon = AnonymousUser()
self.assertFalse(i.has_level(anon, 'alfa'))
self.assertFalse(i.has_level(anon, 'bravo'))
def test_middle_user_level(self):
i = TestModel.objects.create(normal_field='Hello')
self.assertFalse(i.has_level(self.u1, 'alfa'))
self.assertFalse(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
i.set_level(self.u1, 'bravo')
self.assertTrue(i.has_level(self.u1, 'alfa'))
self.assertTrue(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
def test_level_set_twice_same(self):
i = TestModel.objects.create(normal_field='Hello')
self.assertFalse(i.has_level(self.u1, 'alfa'))
self.assertFalse(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
i.set_level(self.u1, 'bravo')
i.set_level(self.u1, 'bravo')
self.assertTrue(i.has_level(self.u1, 'alfa'))
self.assertTrue(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
def test_level_set_twice_different(self):
i = TestModel.objects.create(normal_field='Hello')
self.assertFalse(i.has_level(self.u1, 'alfa'))
self.assertFalse(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
i.set_level(self.u1, 'charlie')
i.set_level(self.u1, 'bravo')
self.assertTrue(i.has_level(self.u1, 'alfa'))
self.assertTrue(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
def test_superuser(self):
i = TestModel.objects.create(normal_field='Hello')
for u, v in [(self.u1, False), (self.u2, False), (self.us, True)]:
self.assertEqual(i.has_level(u, 'alfa'), v)
self.assertEqual(i.has_level(u, 'bravo'), v)
self.assertEqual(i.has_level(u, 'charlie'), v)
def test_check_group_membership(self):
groups = self.u1.groups.values_list('id', flat=True)
self.assertIn(self.g1.id, groups)
self.assertTrue(self.g1.user_set.filter(id=self.u2.id).exists())
def test_lowest_group_level(self):
i = TestModel.objects.create(normal_field='Hello')
self.assertFalse(i.has_level(self.u1, 'alfa'))
self.assertFalse(i.has_level(self.u1, 'bravo'))
i.set_level(self.g1, 'alfa')
self.assertTrue(i.has_level(self.u1, 'alfa'))
self.assertFalse(i.has_level(self.u1, 'bravo'))
def test_middle_group_level(self):
i = TestModel.objects.create(normal_field='Hello')
self.assertFalse(i.has_level(self.u1, 'alfa'))
self.assertFalse(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
i.set_level(self.g1, 'bravo')
self.assertTrue(i.has_level(self.u1, 'alfa'))
self.assertTrue(i.has_level(self.u1, 'bravo'))
self.assertFalse(i.has_level(self.u1, 'charlie'))
def test_set_level_error_handling(self):
with self.assertRaises(AttributeError):
TestModel.objects.create().set_level('wrong arg', 'level')
def test_get_users_with_level(self):
i1 = TestModel.objects.create(normal_field='Hello')
i2 = Test2Model.objects.create(normal2_field='Hello2')
i1.set_level(self.u1, 'bravo')
i1.set_level(self.u2, 'charlie')
i2.set_level(self.u1, 'one')
i2.set_level(self.us, u'three')
res1 = i1.get_users_with_level()
self.assertEqual([(self.u1, u'bravo'), (self.u2, u'charlie')], res1)
res2 = i2.get_users_with_level()
self.assertEqual([(self.u1, u'one'), (self.us, u'three')], res2)
def test_get_groups_with_level(self):
i1 = TestModel.objects.create(normal_field='Hello')
i2 = Test2Model.objects.create(normal2_field='Hello2')
i1.set_level(self.g1, 'bravo')
i1.set_level(self.u2, 'charlie')
i2.set_level(self.g1, 'one')
i2.set_level(self.us, u'three')
res1 = i1.get_groups_with_level()
self.assertEqual([(self.g1, u'bravo')], res1)
res2 = i2.get_groups_with_level()
self.assertEqual([(self.g1, u'one')], res2)
def test_object_level_unicode(self):
i1 = TestModel.objects.create(normal_field='Hello')
i1.set_level(self.g1, 'bravo')
unicode(ObjectLevel.objects.all()[0])
# Create your views here.
"""Common settings and globals."""
from datetime import timedelta
from os import environ
from os.path import abspath, basename, dirname, join, normpath
from json import loads
# from socket import SOCK_STREAM
from sys import path
# Normally you should not import ANYTHING from Django directly
# into your settings, but ImproperlyConfigured is an exception.
from django.core.exceptions import ImproperlyConfigured
......@@ -236,6 +237,7 @@ LOCAL_APPS = (
'network',
'dashboard',
'manager',
'acl',
)
# See: https://docs.djangoproject.com/en/dev/ref/settings/#installed-apps
......@@ -306,5 +308,12 @@ VM_ACCESS_PROTOCOLS = loads(get_env_variable('DJANGO_VM_ACCESS_PROTOCOLS',
"ssh": ["SSH", 22, "tcp"]}'''))
VM_SCHEDULER = 'manager.scheduler'
BROKER_URL = get_env_variable('AMQP_URI')
BROKER_URL=get_env_variable('AMQP_URI')
# Set up periodic firewall tasks
CELERYBEAT_SCHEDULE = {
'blabla': {
'task': 'firewall.tasks.local_tasks.periodic_task',
'schedule': timedelta(seconds=5),
},
}
......@@ -16,3 +16,9 @@ DATABASES = {
"PORT": "",
},
}
SOUTH_TESTS_MIGRATE = False
INSTALLED_APPS += (
'acl.tests',
)
[
{
"pk": 1,
"model": "sites.site",
"fields": {
"domain": "example.com",
"name": "example.com"
}
},
{
"pk": 1,
"model": "vm.lease",
"fields": {
"suspend_interval_seconds": 18000,
"name": "lab",
"delete_interval_seconds": 180000
}
},
{
"pk": 1,
"model": "storage.datastore",
"fields": {
"path": "/disks",
"hostname": "wut",
"name": "diszkek"
}
},
{
"pk": 1,
"model": "storage.disk",
"fields": {
"name": "diszk",
"created": "2013-09-16T09:05:56.926Z",
"modified": "2013-09-19T09:10:25.117Z",
"filename": "disc.img",
"destroyed": null,
"base": null,
"ready": true,
"datastore": 1,
"dev_num": "a",
"type": "raw-rw",
"size": 8589934592
}
},
{
"pk": 1,
"model": "acl.level",
"fields": {
"codename": "owner",
"weight": 2,
"name": "owner",
"content_type": 15
}
},
{
"pk": 2,
"model": "acl.level",
"fields": {
"codename": "operator",
"weight": 1,
"name": "operator",
"content_type": 15
}
},
{
"pk": 3,
"model": "acl.level",
"fields": {
"codename": "user",
"weight": 0,
"name": "user",
"content_type": 15
}
},
{
"pk": 1,
"model": "auth.permission",
"fields": {
"codename": "add_permission",
"name": "Can add permission",
"content_type": 1
}
},
{
"pk": 2,
"model": "auth.permission",
"fields": {
"codename": "change_permission",
"name": "Can change permission",
"content_type": 1
}
},
{
"pk": 3,
"model": "auth.permission",
"fields": {
"codename": "delete_permission",
"name": "Can delete permission",
"content_type": 1
}
},
{
"pk": 4,
"model": "auth.permission",
"fields": {
"codename": "add_group",
"name": "Can add group",
"content_type": 2
}
},
{
"pk": 5,
"model": "auth.permission",
"fields": {
"codename": "change_group",
"name": "Can change group",
"content_type": 2
}
},
{
"pk": 6,
"model": "auth.permission",
"fields": {
"codename": "delete_group",
"name": "Can delete group",
"content_type": 2
}
},
{
"pk": 7,
"model": "auth.permission",
"fields": {
"codename": "add_user",
"name": "Can add user",
"content_type": 3
}
},
{
"pk": 8,
"model": "auth.permission",
"fields": {
"codename": "change_user",
"name": "Can change user",
"content_type": 3
}
},
{
"pk": 9,
"model": "auth.permission",
"fields": {
"codename": "delete_user",
"name": "Can delete user",
"content_type": 3
}
},
{
"pk": 10,
"model": "auth.permission",
"fields": {
"codename": "add_contenttype",
"name": "Can add content type",
"content_type": 4
}
},
{
"pk": 11,
"model": "auth.permission",
"fields": {
"codename": "change_contenttype",
"name": "Can change content type",
"content_type": 4
}
},
{
"pk": 12,
"model": "auth.permission",
"fields": {
"codename": "delete_contenttype",
"name": "Can delete content type",
"content_type": 4
}
},
{
"pk": 13,
"model": "auth.permission",
"fields": {
"codename": "add_session",
"name": "Can add session",
"content_type": 5
}
},
{
"pk": 14,
"model": "auth.permission",
"fields": {
"codename": "change_session",
"name": "Can change session",
"content_type": 5
}
},
{
"pk": 15,
"model": "auth.permission",
"fields": {
"codename": "delete_session",
"name": "Can delete session",
"content_type": 5
}
},
{
"pk": 16,
"model": "auth.permission",
"fields": {
"codename": "add_site",
"name": "Can add site",
"content_type": 6
}
},
{
"pk": 17,
"model": "auth.permission",
"fields": {
"codename": "change_site",
"name": "Can change site",
"content_type": 6
}
},
{
"pk": 18,
"model": "auth.permission",
"fields": {
"codename": "delete_site",
"name": "Can delete site",
"content_type": 6
}
},
{
"pk": 19,
"model": "auth.permission",
"fields": {
"codename": "add_logentry",
"name": "Can add log entry",
"content_type": 7
}
},
{
"pk": 20,
"model": "auth.permission",
"fields": {