Add permissions.

master
Tom Hacohen 4 years ago
parent df19887af7
commit c7b8b0373a

@ -12,10 +12,10 @@ from pydantic import BaseModel
from django_etebase import models from django_etebase import models
from .authentication import get_authenticated_user from .authentication import get_authenticated_user
from .exceptions import ValidationError, transform_validation_error from .exceptions import ValidationError, transform_validation_error, PermissionDenied
from .msgpack import MsgpackRoute, MsgpackResponse from .msgpack import MsgpackRoute, MsgpackResponse
from .stoken_handler import filter_by_stoken_and_limit, filter_by_stoken, get_stoken_obj, get_queryset_stoken from .stoken_handler import filter_by_stoken_and_limit, filter_by_stoken, get_stoken_obj, get_queryset_stoken
from .utils import get_object_or_404, Context, Prefetch, PrefetchQuery from .utils import get_object_or_404, Context, Prefetch, PrefetchQuery, is_collection_admin
User = get_user_model() User = get_user_model()
collection_router = APIRouter(route_class=MsgpackRoute) collection_router = APIRouter(route_class=MsgpackRoute)
@ -209,6 +209,26 @@ def get_item_queryset(collection: models.Collection = Depends(get_collection)) -
return queryset return queryset
# permissions
def verify_collection_admin(
collection: models.Collection = Depends(get_collection), user: User = Depends(get_authenticated_user)
):
if not is_collection_admin(collection, user):
raise PermissionDenied("admin_access_required", "Only collection admins can perform this operation.")
def has_write_access(
collection: models.Collection = Depends(get_collection), user: User = Depends(get_authenticated_user)
):
member = collection.members.get(user=user)
if member.accessLevel == models.AccessLevels.READ_ONLY:
raise PermissionDenied("no_write_access", "You need write access to write to this collection")
# paths
@collection_router.post("/list_multi/") @collection_router.post("/list_multi/")
async def list_multi( async def list_multi(
data: ListMulti, data: ListMulti,
@ -489,14 +509,14 @@ def fetch_updates(
return MsgpackResponse(ret) return MsgpackResponse(ret)
@collection_router.post("/{collection_uid}/item/transaction/") @collection_router.post("/{collection_uid}/item/transaction/", dependencies=[Depends(has_write_access)])
def item_transaction( def item_transaction(
collection_uid: str, data: ItemBatchIn, stoken: t.Optional[str] = None, user: User = Depends(get_authenticated_user) collection_uid: str, data: ItemBatchIn, stoken: t.Optional[str] = None, user: User = Depends(get_authenticated_user)
): ):
return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True) return item_bulk_common(data, user, stoken, collection_uid, validate_etag=True)
@collection_router.post("/{collection_uid}/item/batch/") @collection_router.post("/{collection_uid}/item/batch/", dependencies=[Depends(has_write_access)])
def item_batch( def item_batch(
collection_uid: str, data: ItemBatchIn, stoken: t.Optional[str] = None, user: User = Depends(get_authenticated_user) collection_uid: str, data: ItemBatchIn, stoken: t.Optional[str] = None, user: User = Depends(get_authenticated_user)
): ):

@ -12,7 +12,7 @@ from .msgpack import MsgpackResponse
from .utils import get_object_or_404 from .utils import get_object_or_404
from .stoken_handler import filter_by_stoken_and_limit from .stoken_handler import filter_by_stoken_and_limit
from .collection import collection_router, get_collection from .collection import collection_router, get_collection, verify_collection_admin
User = get_user_model() User = get_user_model()
default_queryset: QuerySet = models.CollectionMember.objects.all() default_queryset: QuerySet = models.CollectionMember.objects.all()
@ -48,7 +48,9 @@ class MemberListResponse(BaseModel):
done: bool done: bool
@collection_router.get("/{collection_uid}/member/", response_model=MemberListResponse) @collection_router.get(
"/{collection_uid}/member/", response_model=MemberListResponse, dependencies=[Depends(verify_collection_admin)]
)
def member_list( def member_list(
iterator: t.Optional[str] = None, iterator: t.Optional[str] = None,
limit: int = 50, limit: int = 50,
@ -68,14 +70,22 @@ def member_list(
return MsgpackResponse(ret) return MsgpackResponse(ret)
@collection_router.delete("/{collection_uid}/member/{username}/", status_code=status.HTTP_204_NO_CONTENT) @collection_router.delete(
"/{collection_uid}/member/{username}/",
status_code=status.HTTP_204_NO_CONTENT,
dependencies=[Depends(verify_collection_admin)],
)
def member_delete( def member_delete(
obj: models.CollectionMember = Depends(get_member), obj: models.CollectionMember = Depends(get_member),
): ):
obj.revoke() obj.revoke()
@collection_router.patch("/{collection_uid}/member/{username}/", status_code=status.HTTP_204_NO_CONTENT) @collection_router.patch(
"/{collection_uid}/member/{username}/",
status_code=status.HTTP_204_NO_CONTENT,
dependencies=[Depends(verify_collection_admin)],
)
def member_patch( def member_patch(
data: CollectionMemberModifyAccessLevelIn, data: CollectionMemberModifyAccessLevelIn,
instance: models.CollectionMember = Depends(get_member), instance: models.CollectionMember = Depends(get_member),
@ -88,7 +98,10 @@ def member_patch(
instance.save() instance.save()
@collection_router.post("/{collection_uid}/member/leave/", status_code=status.HTTP_204_NO_CONTENT) @collection_router.post(
"/{collection_uid}/member/leave/",
status_code=status.HTTP_204_NO_CONTENT,
)
def member_leave(user: User = Depends(get_authenticated_user), collection: models.Collection = Depends(get_collection)): def member_leave(user: User = Depends(get_authenticated_user), collection: models.Collection = Depends(get_collection)):
obj = get_object_or_404(collection.members, user=user) obj = get_object_or_404(collection.members, user=user)
obj.revoke() obj.revoke()

Loading…
Cancel
Save