You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

110 lines
4.0 KiB
Python

Implement checking the username against LDAP (#64) This commit adds code to allow users that prefer to do user management via LDAP to do so. Since Etebase does not store the password (proof) in a LDAP compatible fashion, we compromise and instead two checks: One while registering an account to see if the username is the LDAP directory and one whenever the API is accessed. To prevent too many LDAP requests, the result of the request is cached for an configurable amount of hours. Make sure you have python-ldap installed and can successfully import ldap. Then, if you use the easy config, add the following to your config: ``` ; [...] ; Regular etesync configuration [ldap] bind_dn = <Your LDAP "user" to bind as. See Note 1> bind_pw = <The password to authenticate as your bind user> ; Or if you have the password in a file: ; bind_pw_file = /path/to/the/file.txt server = <The URL to your LDAP server> search_base = <Your search base> filter = <Your LDAP filter query. See Note 2> ; In case a cache TTL of 1 hour is too short for you, set `cache_ttl` to the preferred ; amount of hours a cache entry should be viewed as valid: ; cache_ttl = 5 ``` With this config, I am able to make the EteSync server check with my LDAP server if a user should be able to login or register. Note that if a user is allowed to login or register, the password of the LDAP user will be ignored. This LDAP patch is nothing more than an additional check before the actual authentication. A successful LDAP check will be cached, if not configured (correctly), for one hour, after which the LDAP query will be performed again. Note 1: This commit only works with a bind user Note 2: The query must be specified. If an LDAP query returns more than one or no result, then the authentication fails. If your query needs to include the username that currently tries to perform a login or registration, you can use %%s, which will be subsituted for the used username.
2 years ago
import logging
from django.utils import timezone
from django.conf import settings
from django.core.exceptions import PermissionDenied as DjangoPermissionDenied
from etebase_server.django.utils import CallbackContext
from etebase_server.myauth.models import get_typed_user_model, UserType
from etebase_server.fastapi.dependencies import get_authenticated_user
from etebase_server.fastapi.exceptions import PermissionDenied as FastAPIPermissionDenied
from fastapi import Depends
import ldap
User = get_typed_user_model()
def ldap_setting(name, default):
"""Wrapper around django.conf.settings"""
return getattr(settings, f"LDAP_{name}", default)
class LDAPConnection:
__instance__ = None
__user_cache = {} # Username -> Valid until
@staticmethod
def get_instance():
"""To get a Singleton"""
if not LDAPConnection.__instance__:
return LDAPConnection()
else:
return LDAPConnection.__instance__
def __init__(self):
# Cache some settings
self.__LDAP_FILTER = ldap_setting("FILTER", "")
self.__LDAP_SEARCH_BASE = ldap_setting("SEARCH_BASE", "")
# The time a cache entry is valid (in hours)
try:
self.__LDAP_CACHE_TTL = int(ldap_setting("CACHE_TTL", ""))
except ValueError:
logging.error("Invalid value for cache_ttl. Defaulting to 1 hour")
self.__LDAP_CACHE_TTL = 1
password = ldap_setting("BIND_PW", "")
if not password:
pw_file = ldap_setting("BIND_PW_FILE", "")
if pw_file:
with open(pw_file, "r") as f:
password = f.read().replace("\n", "")
self.__ldap_connection = ldap.initialize(ldap_setting("SERVER", ""))
try:
self.__ldap_connection.simple_bind_s(ldap_setting("BIND_DN", ""), password)
except ldap.LDAPError as err:
logging.error(f"LDAP Error occuring during bind: {err.desc}")
def __is_cache_valid(self, username):
"""Returns True if the cache entry is still valid. Returns False otherwise."""
if username in self.__user_cache:
if timezone.now() <= self.__user_cache[username]:
# Cache entry is still valid
return True
return False
def __remove_cache(self, username):
del self.__user_cache[username]
def has_user(self, username):
"""
Since we don't care about the password and so authentication
another way, all we care about is whether the user exists.
"""
if self.__is_cache_valid(username):
return True
if username in self.__user_cache:
self.__remove_cache(username)
filterstr = self.__LDAP_FILTER.replace("%s", username)
try:
result = self.__ldap_connection.search_s(self.__LDAP_SEARCH_BASE, ldap.SCOPE_SUBTREE, filterstr=filterstr)
except ldap.NO_RESULTS_RETURNED:
# We handle the specific error first and the the generic error, as
# we may expect ldap.NO_RESULTS_RETURNED, but not any other error
return False
except ldap.LDAPError as err:
logging.error(f"Error occured while performing an LDAP query: {err.desc}")
return False
if len(result) == 1:
self.__user_cache[username] = timezone.now() + timezone.timedelta(hours=self.__LDAP_CACHE_TTL)
return True
return False
def is_user_in_ldap(user: UserType = Depends(get_authenticated_user)):
if not LDAPConnection.get_instance().has_user(user.username):
raise FastAPIPermissionDenied(detail="User not in LDAP directory.")
def create_user(context: CallbackContext, *args, **kwargs):
"""
A create_user function which first checks if the user already exists in the
configured LDAP directory.
"""
if not LDAPConnection.get_instance().has_user(kwargs["username"]):
raise DjangoPermissionDenied("User not in the LDAP directory.")
return User.objects.create_user(*args, **kwargs)