py.lib.aw_web_tools

Yohn Y. 2024-02-27 Child:b9fd029be707

5:4d3b509e0967 Go to Latest

py.lib.aw_web_tools/src/aw_web_tools/id_helper.py

+ Реализация плагина идентификации . Сводим все классы исключений к одному предку для удобства

History
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/aw_web_tools/id_helper.py	Tue Feb 27 19:29:01 2024 +0300
     1.3 @@ -0,0 +1,353 @@
     1.4 +# coding: utf-8
     1.5 +"""\
     1.6 +Модуль вспомогательных средств обеспечения идентификации пользователей
     1.7 +"""
     1.8 +import bottle
     1.9 +from string import ascii_letters, digits
    1.10 +from typing import Optional, Dict, Iterable, Any
    1.11 +from . import btle_tools as tools
    1.12 +from . import Error
    1.13 +from . import jwt
    1.14 +from .cookie import Cookie
    1.15 +
    1.16 +
    1.17 +SESSION_TIMEOUT = 86400    # Продолжительность сессии по умолчанию.
    1.18 +
    1.19 +
    1.20 +class IDError(Error):
    1.21 +    """\
    1.22 +    Общий класс ошибок идентификации
    1.23 +    """
    1.24 +
    1.25 +
    1.26 +class IDCheckError(IDError):
    1.27 +    """\
    1.28 +    Ошибка проверки ID
    1.29 +    """
    1.30 +
    1.31 +
    1.32 +class IDNotFound(IDCheckError):
    1.33 +    """\
    1.34 +    Запрос не идентифицирован
    1.35 +    """
    1.36 +    def __init__(self):
    1.37 +        super().__init__('Запрос не идентифицирован')
    1.38 +
    1.39 +
    1.40 +class UIDError(IDError):
    1.41 +    """\
    1.42 +    Ошибки при работе с классом пользовательского идентификатора
    1.43 +    """
    1.44 +
    1.45 +
    1.46 +class UID(object):
    1.47 +    """\
    1.48 +    Класс пользовательского идентификатора.
    1.49 +
    1.50 +    Обеспечивает хранение идентификационной информации о пользователе.
    1.51 +    """
    1.52 +    ENV_NAME = 'AW_UID'  # Идентификатор для хранения объекта в контексте запроса.
    1.53 +
    1.54 +    def __init__(self, uname: str, acc_level: int = -1,
    1.55 +                 sess_id: Optional[str] = None,
    1.56 +                 acc_tags: Optional[Iterable[str]] = None,
    1.57 +                 user_meta: Optional[Dict[str, str]] = None
    1.58 +                 ):
    1.59 +        """\
    1.60 +        :param uname: Имя пользователя внутри системы
    1.61 +        :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения.
    1.62 +        :param sess_id: Идентификатор сессии пользователя
    1.63 +        :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов.
    1.64 +                        Интерпретация целиком на стороне приложения
    1.65 +        :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение
    1.66 +        """
    1.67 +        self.name = uname
    1.68 +        self.level = acc_level
    1.69 +        self.sess_id = sess_id
    1.70 +        self._access = acc_tags
    1.71 +        self._meta = user_meta
    1.72 +
    1.73 +    def get_fp_id(self) -> str:
    1.74 +        """\
    1.75 +        Преобразует объект в форму, пригодную для получения подписи сессии
    1.76 +        """
    1.77 +        buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access())))
    1.78 +        if self.level >= 0:
    1.79 +            buf += f':{self.level}'
    1.80 +
    1.81 +        return f'{self.name}:{buf}'
    1.82 +
    1.83 +    def __str__(self):
    1.84 +        return self.name
    1.85 +
    1.86 +    def get_access(self):
    1.87 +        """\
    1.88 +        Получить теги доступа пользователя, если они имелись
    1.89 +        """
    1.90 +        if self._access:
    1.91 +            return [i for i in self._access]
    1.92 +
    1.93 +        else:
    1.94 +            return []
    1.95 +
    1.96 +    def get_meta(self):
    1.97 +        """\
    1.98 +        Получить метаданные пользователя, если они имелись.
    1.99 +        """
   1.100 +        if self._meta:
   1.101 +            return dict([(k, v) for k, v in self._meta.items()])
   1.102 +
   1.103 +        else:
   1.104 +            return dict()
   1.105 +
   1.106 +    def to_dict(self) -> Dict[str, str]:
   1.107 +        """\
   1.108 +        Преобразование текущего состояния объекта в массив
   1.109 +        """
   1.110 +        res = dict(
   1.111 +            n=self.name,
   1.112 +            id=self.sess_id
   1.113 +        )
   1.114 +
   1.115 +        if self.level >= 0:
   1.116 +            res['al'] = self.level
   1.117 +
   1.118 +        if self._access:
   1.119 +            res['a'] = self.get_access()
   1.120 +
   1.121 +        if self._meta:
   1.122 +            res['m'] = self.get_meta()
   1.123 +
   1.124 +        return res
   1.125 +
   1.126 +    def to_env(self, request: bottle.BaseRequest = bottle.request):
   1.127 +        """\
   1.128 +        Сохранение своего экземпляра в контекст запроса
   1.129 +        """
   1.130 +        tools.set_env(self.ENV_NAME, self, request=request)
   1.131 +
   1.132 +    @classmethod
   1.133 +    def from_dict(cls, d: Dict[str, Any]):
   1.134 +        """\
   1.135 +        Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта,
   1.136 +        подобного сохранённому
   1.137 +        """
   1.138 +        params = dict()
   1.139 +
   1.140 +        # User name
   1.141 +        uname = d.get('n')
   1.142 +        try:
   1.143 +            uname = str(uname) if uname is not None else ''
   1.144 +
   1.145 +        except (ValueError, TypeError) as e:
   1.146 +            raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"')
   1.147 +
   1.148 +        if not uname:
   1.149 +            raise UIDError(f'Идентификатор пользователя не задан в переданном словаре')
   1.150 +
   1.151 +        params['uname'] = uname
   1.152 +
   1.153 +        # Session Identity
   1.154 +        sess_id = d.get('id')
   1.155 +        if sess_id is not None:
   1.156 +            try:
   1.157 +                sess_id = str(sess_id)
   1.158 +
   1.159 +            except (TypeError, ValueError) as e:
   1.160 +                UIDError(f'Не удалось представить идентификатор сессии как строку: '
   1.161 +                         f' sess_id="{sess_id}" '
   1.162 +                         f' err="{e}"')
   1.163 +
   1.164 +        params['sess_id'] = sess_id
   1.165 +
   1.166 +        # Access Level
   1.167 +        level = d.get('al')
   1.168 +
   1.169 +        if level is not None:
   1.170 +            if type(level) is not int:
   1.171 +                try:
   1.172 +                    level = int(level)
   1.173 +
   1.174 +                except (TypeError, ValueError) as e:
   1.175 +                    raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:'
   1.176 +                                   f' level="{level}" err="{e}"'
   1.177 +                                   )
   1.178 +
   1.179 +            params['acc_level'] = level
   1.180 +
   1.181 +        # Access Tags
   1.182 +        acc_tags = d.get('a')
   1.183 +
   1.184 +        if acc_tags is not None:
   1.185 +            try:
   1.186 +                acc_tags = (t for t in iter(acc_tags))
   1.187 +
   1.188 +            except (ValueError, TypeError) as e:
   1.189 +                raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: '
   1.190 +                               f' acc_tags="{acc_tags}" '
   1.191 +                               f' err="{e}"')
   1.192 +
   1.193 +            params['acc_tags'] = acc_tags
   1.194 +
   1.195 +        # User Metadata
   1.196 +        u_meta = d.get('m')
   1.197 +
   1.198 +        if u_meta is not None:
   1.199 +            if type(u_meta) is not dict:
   1.200 +                raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}')
   1.201 +
   1.202 +            try:
   1.203 +                u_meta = dict((str(k), str(v)) for k, v in u_meta.items())
   1.204 +
   1.205 +            except (TypeError, ValueError) as e:
   1.206 +                raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}')
   1.207 +
   1.208 +            params['user_meta'] = u_meta
   1.209 +
   1.210 +        return cls(**params)
   1.211 +
   1.212 +    @classmethod
   1.213 +    def from_env(cls, request: bottle.BaseRequest = bottle.request):
   1.214 +        """\
   1.215 +        Извлечение экземпляра объекта из контекста запроса, если он там сохранён.
   1.216 +        """
   1.217 +        uid = tools.get_env(cls.ENV_NAME, request=request)
   1.218 +
   1.219 +        if uid is None:
   1.220 +            raise IDNotFound()
   1.221 +
   1.222 +        if type(uid) is not cls:
   1.223 +            raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"')
   1.224 +
   1.225 +        return uid
   1.226 +
   1.227 +
   1.228 +class IDHelper(object):
   1.229 +    """\
   1.230 +    Класс поддержки идентификации
   1.231 +    """
   1.232 +    def __init__(self,
   1.233 +                 app_name: str,
   1.234 +                 sign_secret: str,
   1.235 +                 sess_timeout: int = SESSION_TIMEOUT
   1.236 +                 ):
   1.237 +        """\
   1.238 +        :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-``
   1.239 +        :param sign_secret: Секрет, которым будет подписываться JWT.
   1.240 +        :param sess_timeout: Время жизни сессии пользователя.
   1.241 +        """
   1.242 +        if set(ascii_letters + digits + '-') < set(app_name):
   1.243 +            _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-')))
   1.244 +            raise IDError(f'Не допустимые символы в имени приложения: {_buf}')
   1.245 +
   1.246 +        self.app_name = app_name
   1.247 +        self.cookie_name = f'X-ID-{self.app_name}'
   1.248 +        self.sess_timeout = sess_timeout
   1.249 +        self.jwt_helper = jwt.JWTHelper(sign_secret)
   1.250 +
   1.251 +        # Свойства необходимые для работы в качестве плагина Bottle
   1.252 +        self.name = 'IDHelper'
   1.253 +        self.api = 2
   1.254 +
   1.255 +    def make_cookie(self, uid: Optional[UID] = None,
   1.256 +                    request: bottle.BaseRequest = bottle.request,
   1.257 +                    is_secure: bool = True,
   1.258 +                    is_httponly: bool = True
   1.259 +                    ) -> Cookie:
   1.260 +        """\
   1.261 +        Формируем ``Cookie`` для идентификации сессии пользователя.
   1.262 +
   1.263 +        :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса
   1.264 +        :param request: Экземпляр запроса, на основании которого формируется ответ.
   1.265 +        :param is_secure: Установить признак ``secure`` на cookie
   1.266 +        :param is_httponly: Установить признак ``httponly`` на cookie
   1.267 +        """
   1.268 +        if uid is None:
   1.269 +            uid = UID.from_env(request=request)
   1.270 +
   1.271 +        uid_dict = uid.to_dict()
   1.272 +        uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
   1.273 +
   1.274 +        return Cookie(
   1.275 +            name=self.cookie_name,
   1.276 +            value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout),
   1.277 +            max_age=self.sess_timeout,
   1.278 +            secure=is_secure,
   1.279 +            httponly=is_httponly
   1.280 +        )
   1.281 +
   1.282 +    def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response,
   1.283 +               request: bottle.BaseRequest = bottle.request,
   1.284 +               is_secure: bool = True,
   1.285 +               is_httponly: bool = True
   1.286 +               ):
   1.287 +        """\
   1.288 +        Установка cookie на ответ пользователю
   1.289 +
   1.290 +        :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса.
   1.291 +        :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response``
   1.292 +        :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request``
   1.293 +        :param is_secure: Установить признак ``secure`` на cookie
   1.294 +        :param is_httponly: Установить признак ``httponly`` на cookie
   1.295 +        """
   1.296 +        cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly)
   1.297 +        cookie.response_add(response)
   1.298 +
   1.299 +    def _get_id_impl(self, request: bottle.BaseRequest = bottle.request):
   1.300 +        """\
   1.301 +        Реализация получения ID и сохранения его в контекст запроса для последующего применения
   1.302 +        в других методах класса.
   1.303 +        """
   1.304 +        sid = tools.get_cookie(self.cookie_name)
   1.305 +        if sid is None:
   1.306 +            raise IDNotFound()
   1.307 +
   1.308 +        try:
   1.309 +            uid_raw = self.jwt_helper.decode(sid, check_timeout=True)
   1.310 +            uid = UID.from_dict(uid_raw)
   1.311 +
   1.312 +            fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
   1.313 +
   1.314 +            if fp != uid_raw.get('fp'):
   1.315 +                raise IDCheckError('Проверка подписи сессии не прошла')
   1.316 +
   1.317 +            uid.to_env(request=request)
   1.318 +
   1.319 +        except (jwt.JWTAuthError, UIDError) as e:
   1.320 +            raise IDCheckError(f'Ошибка проверки ID: {e}')
   1.321 +
   1.322 +        except (jwt.JWTError, IDError) as e:
   1.323 +            raise IDError(f'Ошибка в обработке ID запроса: {e}')
   1.324 +
   1.325 +    def need_id(self):
   1.326 +        """\
   1.327 +        Декоратор для конкретных точек входа приложения
   1.328 +        """
   1.329 +        def d(callback):
   1.330 +            def f(*a, **kwa):
   1.331 +                self._get_id_impl()
   1.332 +                return callback(*a, **kwa)
   1.333 +
   1.334 +            return f
   1.335 +
   1.336 +        return d
   1.337 +
   1.338 +    @staticmethod
   1.339 +    def env_id(request: bottle.BaseRequest = bottle.request) -> UID:
   1.340 +        """\
   1.341 +        Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне
   1.342 +        плагина или декоратора
   1.343 +        """
   1.344 +        return UID.from_env(request=request)
   1.345 +
   1.346 +    # Реализация методов для работы в качестве Bottle плагина
   1.347 +    def setup(self, app: bottle.Bottle):
   1.348 +        pass
   1.349 +
   1.350 +    def apply(self, callback: Any, route: bottle.Route):
   1.351 +        def f(*a, **kwa):
   1.352 +            self._get_id_impl()
   1.353 +
   1.354 +            return callback(*a, **kwa)
   1.355 +
   1.356 +        return f