Commit f0530049 by Bach Dániel

dashboard: rewrite AclUpdateView

parent 946970a3
...@@ -52,9 +52,11 @@ ...@@ -52,9 +52,11 @@
</a> </a>
</td> </td>
<td> <td>
<select class="form-control" name="perm-u-{{i.user.id}}"> <select class="form-control" name="perm-u-{{i.user.id}}"{% if i.level not in acl.allowed_levels %} disabled{% endif %}>
{% for id, name in acl.levels %} {% for id, name in acl.levels %}
<option{%if id = i.level%} selected="selected"{%endif%} value="{{id}}">{{name}}</option> <option{%if id = i.level%} selected="selected"{%endif%}
{% if id not in acl.allowed_levels %} disabled{% endif %}
value="{{id}}">{{name}}</option>
{% endfor %} {% endfor %}
</select> </select>
</td> </td>
...@@ -72,9 +74,11 @@ ...@@ -72,9 +74,11 @@
</a> </a>
</td> </td>
<td> <td>
<select class="form-control" name="perm-g-{{i.group.id}}"> <select class="form-control" name="perm-g-{{i.group.id}}{% if i.level not in acl.allowed_levels %} disabled{% endif %}">
{% for id, name in acl.levels %} {% for id, name in acl.levels %}
<option{%if id = i.level%} selected="selected"{%endif%} value="{{id}}">{{name}}</option> <option{%if id = i.level%} selected="selected"{%endif%}
{% if id not in acl.allowed_levels %} disabled{% endif %}
value="{{id}}">{{name}}</option>
{% endfor %} {% endfor %}
</select> </select>
</td> </td>
...@@ -87,7 +91,7 @@ ...@@ -87,7 +91,7 @@
<td><input type="text" class="form-control" name="perm-new-name" <td><input type="text" class="form-control" name="perm-new-name"
placeholder="{% trans "Name of group or user" %}"></td> placeholder="{% trans "Name of group or user" %}"></td>
<td><select class="form-control" name="perm-new"> <td><select class="form-control" name="perm-new">
{% for id, name in acl.levels %} {% for id, name in acl.allowed_levels %}
<option value="{{id}}">{{name}}</option> <option value="{{id}}">{{name}}</option>
{% endfor %} {% endfor %}
</select></td><td></td> </select></td><td></td>
...@@ -95,7 +99,7 @@ ...@@ -95,7 +99,7 @@
</tbody> </tbody>
</table> </table>
<div class="form-actions"> <div class="form-actions">
<button type="submit" class="btn btn-success {% if not is_owner %}disabled{% endif %}">{% trans "Save" %}</button> <button type="submit" class="btn btn-success">{% trans "Save" %}</button>
</div> </div>
</form> </form>
</div> </div>
......
...@@ -1263,7 +1263,7 @@ class GroupDetailTest(LoginMixin, TestCase): ...@@ -1263,7 +1263,7 @@ class GroupDetailTest(LoginMixin, TestCase):
str(self.g1.pk) + '/acl/', str(self.g1.pk) + '/acl/',
{'perm-new-name': 'user3', 'perm-new': 'owner'}) {'perm-new-name': 'user3', 'perm-new': 'owner'})
self.assertEqual(acl_users, len(gp.get_users_with_level())) self.assertEqual(acl_users, len(gp.get_users_with_level()))
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 302)
def test_superuser_add_acluser_to_group(self): def test_superuser_add_acluser_to_group(self):
c = Client() c = Client()
...@@ -1306,7 +1306,7 @@ class GroupDetailTest(LoginMixin, TestCase): ...@@ -1306,7 +1306,7 @@ class GroupDetailTest(LoginMixin, TestCase):
str(self.g1.pk) + '/acl/', str(self.g1.pk) + '/acl/',
{'perm-new-name': 'group2', 'perm-new': 'owner'}) {'perm-new-name': 'group2', 'perm-new': 'owner'})
self.assertEqual(acl_groups, len(gp.get_groups_with_level())) self.assertEqual(acl_groups, len(gp.get_groups_with_level()))
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 302)
def test_superuser_add_aclgroup_to_group(self): def test_superuser_add_aclgroup_to_group(self):
c = Client() c = Client()
...@@ -1792,7 +1792,7 @@ class AclViewTest(LoginMixin, TestCase): ...@@ -1792,7 +1792,7 @@ class AclViewTest(LoginMixin, TestCase):
'perm-new': "", 'perm-new': "",
}) })
self.assertTrue((self.u1, "user") in inst.get_users_with_level()) self.assertTrue((self.u1, "user") in inst.get_users_with_level())
self.assertEqual(resp.status_code, 403) self.assertEqual(resp.status_code, 302)
def test_instance_original_owner_access_revoke(self): def test_instance_original_owner_access_revoke(self):
c = Client() c = Client()
...@@ -1836,7 +1836,7 @@ class AclViewTest(LoginMixin, TestCase): ...@@ -1836,7 +1836,7 @@ class AclViewTest(LoginMixin, TestCase):
'perm-new': "", 'perm-new': "",
}) })
self.assertTrue((self.u1, "user") in tmpl.get_users_with_level()) self.assertTrue((self.u1, "user") in tmpl.get_users_with_level())
self.assertEqual(resp.status_code, 403) self.assertEqual(resp.status_code, 302)
def test_template_original_owner_access_revoke(self): def test_template_original_owner_access_revoke(self):
c = Client() c = Client()
......
...@@ -28,14 +28,13 @@ import requests ...@@ -28,14 +28,13 @@ import requests
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User, Group from django.contrib.auth.models import User, Group
from django.contrib.auth.views import login, redirect_to_login from django.contrib.auth.views import login, redirect_to_login
from django.contrib.messages import warning
from django.contrib.messages.views import SuccessMessageMixin from django.contrib.messages.views import SuccessMessageMixin
from django.core.exceptions import ( from django.core.exceptions import (
PermissionDenied, SuspiciousOperation, PermissionDenied, SuspiciousOperation,
) )
from django.core import signing from django.core import signing
from django.core.urlresolvers import reverse, reverse_lazy from django.core.urlresolvers import reverse, reverse_lazy
from django.db.models import Count from django.db.models import Count, Q
from django.http import HttpResponse, HttpResponseRedirect, Http404 from django.http import HttpResponse, HttpResponseRedirect, Http404
from django.shortcuts import redirect, render, get_object_or_404 from django.shortcuts import redirect, render, get_object_or_404
from django.views.decorators.http import require_GET, require_POST from django.views.decorators.http import require_GET, require_POST
...@@ -219,16 +218,6 @@ class IndexView(LoginRequiredMixin, TemplateView): ...@@ -219,16 +218,6 @@ class IndexView(LoginRequiredMixin, TemplateView):
return context return context
def get_acl_data(obj, url):
levels = obj.ACL_LEVELS
users = obj.get_users_with_level()
users = [{'user': u, 'level': l} for u, l in users]
groups = obj.get_groups_with_level()
groups = [{'group': g, 'level': l} for g, l in groups]
return {'users': users, 'groups': groups, 'levels': levels,
'url': reverse(url, args=[obj.pk])}
class CheckedDetailView(LoginRequiredMixin, DetailView): class CheckedDetailView(LoginRequiredMixin, DetailView):
read_level = 'user' read_level = 'user'
...@@ -292,7 +281,8 @@ class VmDetailView(CheckedDetailView): ...@@ -292,7 +281,8 @@ class VmDetailView(CheckedDetailView):
pk__in=Interface.objects.filter( pk__in=Interface.objects.filter(
instance=self.get_object()).values_list("vlan", flat=True) instance=self.get_object()).values_list("vlan", flat=True)
).all() ).all()
context['acl'] = get_acl_data(instance, 'dashboard.views.vm-acl') context['acl'] = AclUpdateView.get_acl_data(
instance, self.request.user, 'dashboard.views.vm-acl')
context['os_type_icon'] = instance.os_type.replace("unknown", context['os_type_icon'] = instance.os_type.replace("unknown",
"question") "question")
# ipv6 infos # ipv6 infos
...@@ -965,7 +955,8 @@ class GroupDetailView(CheckedDetailView): ...@@ -965,7 +955,8 @@ class GroupDetailView(CheckedDetailView):
context['users'] = self.object.user_set.all() context['users'] = self.object.user_set.all()
context['future_users'] = FutureMember.objects.filter( context['future_users'] = FutureMember.objects.filter(
group=self.object) group=self.object)
context['acl'] = get_acl_data(self.object.profile, context['acl'] = AclUpdateView.get_acl_data(
self.object.profile, self.request.user,
'dashboard.views.group-acl') 'dashboard.views.group-acl')
context['group_profile_form'] = GroupProfileUpdate.get_form_object( context['group_profile_form'] = GroupProfileUpdate.get_form_object(
self.request, self.object.profile) self.request, self.object.profile)
...@@ -1009,7 +1000,7 @@ class GroupDetailView(CheckedDetailView): ...@@ -1009,7 +1000,7 @@ class GroupDetailView(CheckedDetailView):
FutureMember.objects.get_or_create(org_id=name, FutureMember.objects.get_or_create(org_id=name,
group=self.object) group=self.object)
else: else:
warning(request, _('User "%s" not found.') % name) messages.warning(request, _('User "%s" not found.') % name)
def __add_list(self, request): def __add_list(self, request):
if not self.get_has_level()(request.user, 'operator'): if not self.get_has_level()(request.user, 'operator'):
...@@ -1054,56 +1045,132 @@ class GroupPermissionsView(SuperuserRequiredMixin, UpdateView): ...@@ -1054,56 +1045,132 @@ class GroupPermissionsView(SuperuserRequiredMixin, UpdateView):
class AclUpdateView(LoginRequiredMixin, View, SingleObjectMixin): class AclUpdateView(LoginRequiredMixin, View, SingleObjectMixin):
def send_success_message(self, whom, old_level, new_level):
if old_level and new_level:
msg = _("Acl user/group %(w)s successfully modified.")
elif not old_level and new_level:
msg = _("Acl user/group %(w)s successfully added.")
elif old_level and not new_level:
msg = _("Acl user/group %(w)s successfully removed.")
if msg:
messages.success(self.request, msg % {'w': whom})
def get_level(self, whom):
for u, level in self.acl_data:
if u == whom:
return level
return None
def post(self, request, *args, **kwargs): @classmethod
instance = self.get_object() def get_acl_data(cls, obj, user, url):
if not (instance.has_level(request.user, "owner") or levels = obj.ACL_LEVELS
getattr(instance, 'owner', None) == request.user): allowed_levels = list(l[0] for l in obj.ACL_LEVELS
logger.warning('Tried to set permissions of %s by non-owner %s.', if cls.has_next_level(user, obj, l[0]))
unicode(instance), unicode(request.user)) is_owner = 'owner' in allowed_levels
raise PermissionDenied()
self.set_levels(request, instance)
self.remove_levels(request, instance)
self.add_levels(request, instance)
return redirect("%s#access" % instance.get_absolute_url())
def set_levels(self, request, instance): allowed_users = cls.get_allowed_users(user, is_owner)
for key, value in request.POST.items(): allowed_groups = cls.get_allowed_groups(user, is_owner)
m = re.match('perm-([ug])-(\d+)', key)
if m: user_levels = list(
typ, id = m.groups() {'user': u, 'level': l} for u, l in obj.get_users_with_level()
entity = {'u': User, 'g': Group}[typ].objects.get(id=id) if is_owner or u == user or u in allowed_users)
if getattr(instance, "owner", None) == entity:
group_levels = list(
{'group': g, 'level': l} for g, l in obj.get_groups_with_level()
if is_owner or g in allowed_groups)
return {'users': user_levels,
'groups': group_levels,
'levels': levels,
'allowed_levels': allowed_levels,
'url': reverse(url, args=[obj.pk])}
@classmethod
def has_next_level(self, user, instance, level):
# TODO
levels = zip(*instance.ACL_LEVELS)[0]
try:
i = levels.index(level)
except ValueError:
i = 0
if i < (len(levels) - 1):
i += 1
next_level = levels[i]
return instance.has_level(user, next_level)
@classmethod
def get_allowed_groups(cls, user, is_owner):
if is_owner:
return Group.objects.all()
else:
profiles = GroupProfile.get_objects_with_level('owner', user)
return Group.objects.filter(groupprofile__in=profiles).distinct()
@classmethod
def get_allowed_users(cls, user, is_owner):
if is_owner:
return User.objects.all()
else:
groups = cls.get_allowed_groups(user, is_owner)
return User.objects.filter(
Q(groups__in=groups) | Q(pk=user.pk)).distinct()
def check_auth(self, whom, old_level, new_level):
if isinstance(whom, Group):
if whom not in AclUpdateView.get_allowed_groups(self.request.user,
self.is_owner):
return False
elif isinstance(whom, User):
if whom not in AclUpdateView.get_allowed_users(self.request.user,
self.is_owner):
return False
return (
AclUpdateView.has_next_level(self.request.user,
self.instance, new_level) and
AclUpdateView.has_next_level(self.request.user,
self.instance, old_level))
def set_level(self, whom, new_level):
user = self.request.user
old_level = self.get_level(whom)
if old_level == new_level:
return
if getattr(self.instance, "owner", None) == whom:
logger.info("Tried to set owner's acl level for %s by %s.", logger.info("Tried to set owner's acl level for %s by %s.",
unicode(instance), unicode(request.user)) unicode(self.instance), unicode(user))
continue
instance.set_level(entity, value)
logger.info("Set %s's acl level for %s to %s by %s.",
unicode(entity), unicode(instance),
value, unicode(request.user))
def remove_levels(self, request, instance):
for key, value in request.POST.items():
if key.startswith("remove"):
typ = key[7:8] # len("remove-")
id = key[9:] # len("remove-x-")
entity = {'u': User, 'g': Group}[typ].objects.get(id=id)
if getattr(instance, "owner", None) == entity:
logger.info("Tried to remove owner from %s by %s.",
unicode(instance), unicode(request.user))
msg = _("The original owner cannot be removed, however " msg = _("The original owner cannot be removed, however "
"you can transfer ownership.") "you can transfer ownership.")
messages.warning(request, msg) if not getattr(self, 'hide_messages', False):
continue messages.warning(self.request, msg)
instance.set_level(entity, None) elif self.check_auth(whom, old_level, new_level):
logger.info("Revoked %s's access to %s by %s.", logger.info(
unicode(entity), unicode(instance), u"Set %s's acl level for %s to %s by %s.", unicode(whom),
unicode(request.user)) unicode(self.instance), new_level, unicode(user))
if not getattr(self, 'hide_messages', False):
self.send_success_message(whom, old_level, new_level)
self.instance.set_level(whom, new_level)
else:
logger.warning(
u"Tried to set %s's acl_level for %s (%s->%s) by %s.",
unicode(whom), unicode(self.instance), old_level, new_level,
unicode(user))
def set_or_remove_levels(self):
for key, value in self.request.POST.items():
m = re.match('(perm|remove)-([ug])-(\d+)', key)
if m:
cmd, typ, id = m.groups()
if cmd == 'remove':
value = None
entity = {'u': User, 'g': Group}[typ].objects.get(id=id)
self.set_level(entity, value)
def add_levels(self, request, instance): def add_levels(self):
name = request.POST['perm-new-name'] name = self.request.POST.get('perm-new-name', None)
value = request.POST['perm-new'] value = self.request.POST.get('perm-new', None)
if not name: if not name or not value:
return return
try: try:
entity = search_user(name) entity = search_user(name)
...@@ -1112,13 +1179,18 @@ class AclUpdateView(LoginRequiredMixin, View, SingleObjectMixin): ...@@ -1112,13 +1179,18 @@ class AclUpdateView(LoginRequiredMixin, View, SingleObjectMixin):
try: try:
entity = Group.objects.get(name=name) entity = Group.objects.get(name=name)
except Group.DoesNotExist: except Group.DoesNotExist:
warning(request, _('User or group "%s" not found.') % name) messages.warning(
request, _('User or group "%s" not found.') % name)
return return
self.set_level(request, instance, entity, value)
instance.set_level(entity, value) def post(self, request, *args, **kwargs):
logger.info("Set %s's new acl level for %s to %s by %s.", instance = self.get_object()
unicode(entity), unicode(instance), self.acl_data = (instance.get_users_with_level() +
value, unicode(request.user)) instance.get_groups_with_level())
self.set_or_remove_levels(request, instance)
self.add_levels(request, instance)
return redirect("%s#access" % instance.get_absolute_url())
class TemplateAclUpdateView(AclUpdateView): class TemplateAclUpdateView(AclUpdateView):
...@@ -1133,9 +1205,8 @@ class TemplateAclUpdateView(AclUpdateView): ...@@ -1133,9 +1205,8 @@ class TemplateAclUpdateView(AclUpdateView):
post_for_disk['perm-new'] = 'user' post_for_disk['perm-new'] = 'user'
request.POST = post_for_disk request.POST = post_for_disk
for d in template.disks.all(): for d in template.disks.all():
self.set_levels(request, d) self.set_or_remove_levels(request, d)
self.add_levels(request, d) self.add_levels(request, d)
self.remove_levels(request, d)
return retval return retval
...@@ -1291,7 +1362,8 @@ class TemplateDetail(LoginRequiredMixin, SuccessMessageMixin, UpdateView): ...@@ -1291,7 +1362,8 @@ class TemplateDetail(LoginRequiredMixin, SuccessMessageMixin, UpdateView):
def get_context_data(self, **kwargs): def get_context_data(self, **kwargs):
obj = self.get_object() obj = self.get_object()
context = super(TemplateDetail, self).get_context_data(**kwargs) context = super(TemplateDetail, self).get_context_data(**kwargs)
context['acl'] = get_acl_data(obj, 'dashboard.views.template-acl') context['acl'] = AclUpdateView.get_acl_data(
obj, self.request.user, 'dashboard.views.template-acl')
context['disks'] = obj.disks.all() context['disks'] = obj.disks.all()
context['is_owner'] = obj.has_level(self.request.user, 'owner') context['is_owner'] = obj.has_level(self.request.user, 'owner')
return context return context
...@@ -2284,7 +2356,8 @@ class LeaseDetail(LoginRequiredMixin, SuperuserRequiredMixin, ...@@ -2284,7 +2356,8 @@ class LeaseDetail(LoginRequiredMixin, SuperuserRequiredMixin,
def get_context_data(self, *args, **kwargs): def get_context_data(self, *args, **kwargs):
obj = self.get_object() obj = self.get_object()
context = super(LeaseDetail, self).get_context_data(*args, **kwargs) context = super(LeaseDetail, self).get_context_data(*args, **kwargs)
context['acl'] = get_acl_data(obj, 'dashboard.views.lease-acl') context['acl'] = AclUpdateView.get_acl_data(
obj, self.request.user, 'dashboard.views.lease-acl')
return context return context
def get_success_url(self): def get_success_url(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment