py.lib.aw_web_tools
2025-10-15
Parent:9c734271dcf2
py.lib.aw_web_tools/src/aw_web_tools/id_helper.py
.. 1.202510.1 + Режим фековой авторизации для адаптера authelia. Режим требуется для отладки, поскольку вряд ли на машине разработчика будет развёрнуто это ПО на ранних стадиях разработки (преальфа). Возможно режим будет полезен при поиска проблем в приложении при авторизации.
| awgur@5 | 1 # coding: utf-8 |
| awgur@5 | 2 """\ |
| awgur@5 | 3 Модуль вспомогательных средств обеспечения идентификации пользователей |
| awgur@5 | 4 """ |
| awgur@5 | 5 import bottle |
| awgur@5 | 6 from string import ascii_letters, digits |
| awgur@5 | 7 from typing import Optional, Dict, Iterable, Any |
| awgur@9 | 8 |
| awgur@5 | 9 from . import btle_tools as tools |
| awgur@5 | 10 from . import Error |
| awgur@8 | 11 from . import jwt_helper |
| awgur@5 | 12 from .cookie import Cookie |
| awgur@9 | 13 from .authelia_helper import AHAuthError, AutheliaHelper, AHUser |
| awgur@5 | 14 |
| awgur@5 | 15 |
| awgur@5 | 16 SESSION_TIMEOUT = 86400 # Продолжительность сессии по умолчанию. |
| awgur@5 | 17 |
| awgur@5 | 18 |
| awgur@5 | 19 class IDError(Error): |
| awgur@5 | 20 """\ |
| awgur@5 | 21 Общий класс ошибок идентификации |
| awgur@5 | 22 """ |
| awgur@5 | 23 |
| awgur@5 | 24 |
| awgur@5 | 25 class IDCheckError(IDError): |
| awgur@5 | 26 """\ |
| awgur@5 | 27 Ошибка проверки ID |
| awgur@5 | 28 """ |
| awgur@5 | 29 |
| awgur@5 | 30 |
| awgur@5 | 31 class IDNotFound(IDCheckError): |
| awgur@5 | 32 """\ |
| awgur@5 | 33 Запрос не идентифицирован |
| awgur@5 | 34 """ |
| awgur@5 | 35 def __init__(self): |
| awgur@5 | 36 super().__init__('Запрос не идентифицирован') |
| awgur@5 | 37 |
| awgur@5 | 38 |
| awgur@5 | 39 class UIDError(IDError): |
| awgur@5 | 40 """\ |
| awgur@5 | 41 Ошибки при работе с классом пользовательского идентификатора |
| awgur@5 | 42 """ |
| awgur@5 | 43 |
| awgur@5 | 44 |
| awgur@5 | 45 class UID(object): |
| awgur@5 | 46 """\ |
| awgur@5 | 47 Класс пользовательского идентификатора. |
| awgur@5 | 48 |
| awgur@5 | 49 Обеспечивает хранение идентификационной информации о пользователе. |
| awgur@5 | 50 """ |
| awgur@5 | 51 ENV_NAME = 'AW_UID' # Идентификатор для хранения объекта в контексте запроса. |
| awgur@5 | 52 |
| awgur@5 | 53 def __init__(self, uname: str, acc_level: int = -1, |
| awgur@5 | 54 sess_id: Optional[str] = None, |
| awgur@5 | 55 acc_tags: Optional[Iterable[str]] = None, |
| awgur@5 | 56 user_meta: Optional[Dict[str, str]] = None |
| awgur@5 | 57 ): |
| awgur@5 | 58 """\ |
| awgur@5 | 59 :param uname: Имя пользователя внутри системы |
| awgur@5 | 60 :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения. |
| awgur@5 | 61 :param sess_id: Идентификатор сессии пользователя |
| awgur@5 | 62 :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов. |
| awgur@5 | 63 Интерпретация целиком на стороне приложения |
| awgur@5 | 64 :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение |
| awgur@5 | 65 """ |
| awgur@5 | 66 self.name = uname |
| awgur@5 | 67 self.level = acc_level |
| awgur@5 | 68 self.sess_id = sess_id |
| awgur@5 | 69 self._access = acc_tags |
| awgur@5 | 70 self._meta = user_meta |
| awgur@5 | 71 |
| awgur@5 | 72 def get_fp_id(self) -> str: |
| awgur@5 | 73 """\ |
| awgur@5 | 74 Преобразует объект в форму, пригодную для получения подписи сессии |
| awgur@5 | 75 """ |
| awgur@5 | 76 buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access()))) |
| awgur@5 | 77 if self.level >= 0: |
| awgur@5 | 78 buf += f':{self.level}' |
| awgur@5 | 79 |
| awgur@5 | 80 return f'{self.name}:{buf}' |
| awgur@5 | 81 |
| awgur@5 | 82 def __str__(self): |
| awgur@5 | 83 return self.name |
| awgur@5 | 84 |
| awgur@5 | 85 def get_access(self): |
| awgur@5 | 86 """\ |
| awgur@5 | 87 Получить теги доступа пользователя, если они имелись |
| awgur@5 | 88 """ |
| awgur@5 | 89 if self._access: |
| awgur@5 | 90 return [i for i in self._access] |
| awgur@5 | 91 |
| awgur@5 | 92 else: |
| awgur@5 | 93 return [] |
| awgur@5 | 94 |
| awgur@5 | 95 def get_meta(self): |
| awgur@5 | 96 """\ |
| awgur@5 | 97 Получить метаданные пользователя, если они имелись. |
| awgur@5 | 98 """ |
| awgur@5 | 99 if self._meta: |
| awgur@5 | 100 return dict([(k, v) for k, v in self._meta.items()]) |
| awgur@5 | 101 |
| awgur@5 | 102 else: |
| awgur@5 | 103 return dict() |
| awgur@5 | 104 |
| awgur@5 | 105 def to_dict(self) -> Dict[str, str]: |
| awgur@5 | 106 """\ |
| awgur@5 | 107 Преобразование текущего состояния объекта в массив |
| awgur@5 | 108 """ |
| awgur@5 | 109 res = dict( |
| awgur@5 | 110 n=self.name, |
| awgur@5 | 111 id=self.sess_id |
| awgur@5 | 112 ) |
| awgur@5 | 113 |
| awgur@5 | 114 if self.level >= 0: |
| awgur@5 | 115 res['al'] = self.level |
| awgur@5 | 116 |
| awgur@5 | 117 if self._access: |
| awgur@5 | 118 res['a'] = self.get_access() |
| awgur@5 | 119 |
| awgur@5 | 120 if self._meta: |
| awgur@5 | 121 res['m'] = self.get_meta() |
| awgur@5 | 122 |
| awgur@5 | 123 return res |
| awgur@5 | 124 |
| awgur@5 | 125 def to_env(self, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 126 """\ |
| awgur@5 | 127 Сохранение своего экземпляра в контекст запроса |
| awgur@5 | 128 """ |
| awgur@5 | 129 tools.set_env(self.ENV_NAME, self, request=request) |
| awgur@5 | 130 |
| awgur@5 | 131 @classmethod |
| awgur@5 | 132 def from_dict(cls, d: Dict[str, Any]): |
| awgur@5 | 133 """\ |
| awgur@5 | 134 Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта, |
| awgur@5 | 135 подобного сохранённому |
| awgur@5 | 136 """ |
| awgur@5 | 137 params = dict() |
| awgur@5 | 138 |
| awgur@5 | 139 # User name |
| awgur@5 | 140 uname = d.get('n') |
| awgur@5 | 141 try: |
| awgur@5 | 142 uname = str(uname) if uname is not None else '' |
| awgur@5 | 143 |
| awgur@5 | 144 except (ValueError, TypeError) as e: |
| awgur@5 | 145 raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"') |
| awgur@5 | 146 |
| awgur@5 | 147 if not uname: |
| awgur@5 | 148 raise UIDError(f'Идентификатор пользователя не задан в переданном словаре') |
| awgur@5 | 149 |
| awgur@5 | 150 params['uname'] = uname |
| awgur@5 | 151 |
| awgur@5 | 152 # Session Identity |
| awgur@5 | 153 sess_id = d.get('id') |
| awgur@5 | 154 if sess_id is not None: |
| awgur@5 | 155 try: |
| awgur@5 | 156 sess_id = str(sess_id) |
| awgur@5 | 157 |
| awgur@5 | 158 except (TypeError, ValueError) as e: |
| awgur@5 | 159 UIDError(f'Не удалось представить идентификатор сессии как строку: ' |
| awgur@5 | 160 f' sess_id="{sess_id}" ' |
| awgur@5 | 161 f' err="{e}"') |
| awgur@5 | 162 |
| awgur@5 | 163 params['sess_id'] = sess_id |
| awgur@5 | 164 |
| awgur@5 | 165 # Access Level |
| awgur@5 | 166 level = d.get('al') |
| awgur@5 | 167 |
| awgur@5 | 168 if level is not None: |
| awgur@5 | 169 if type(level) is not int: |
| awgur@5 | 170 try: |
| awgur@5 | 171 level = int(level) |
| awgur@5 | 172 |
| awgur@5 | 173 except (TypeError, ValueError) as e: |
| awgur@5 | 174 raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:' |
| awgur@5 | 175 f' level="{level}" err="{e}"' |
| awgur@5 | 176 ) |
| awgur@5 | 177 |
| awgur@5 | 178 params['acc_level'] = level |
| awgur@5 | 179 |
| awgur@5 | 180 # Access Tags |
| awgur@5 | 181 acc_tags = d.get('a') |
| awgur@5 | 182 |
| awgur@5 | 183 if acc_tags is not None: |
| awgur@5 | 184 try: |
| awgur@5 | 185 acc_tags = (t for t in iter(acc_tags)) |
| awgur@5 | 186 |
| awgur@5 | 187 except (ValueError, TypeError) as e: |
| awgur@5 | 188 raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: ' |
| awgur@5 | 189 f' acc_tags="{acc_tags}" ' |
| awgur@5 | 190 f' err="{e}"') |
| awgur@5 | 191 |
| awgur@5 | 192 params['acc_tags'] = acc_tags |
| awgur@5 | 193 |
| awgur@5 | 194 # User Metadata |
| awgur@5 | 195 u_meta = d.get('m') |
| awgur@5 | 196 |
| awgur@5 | 197 if u_meta is not None: |
| awgur@5 | 198 if type(u_meta) is not dict: |
| awgur@5 | 199 raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}') |
| awgur@5 | 200 |
| awgur@5 | 201 try: |
| awgur@5 | 202 u_meta = dict((str(k), str(v)) for k, v in u_meta.items()) |
| awgur@5 | 203 |
| awgur@5 | 204 except (TypeError, ValueError) as e: |
| awgur@5 | 205 raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}') |
| awgur@5 | 206 |
| awgur@5 | 207 params['user_meta'] = u_meta |
| awgur@5 | 208 |
| awgur@5 | 209 return cls(**params) |
| awgur@5 | 210 |
| awgur@5 | 211 @classmethod |
| awgur@5 | 212 def from_env(cls, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 213 """\ |
| awgur@5 | 214 Извлечение экземпляра объекта из контекста запроса, если он там сохранён. |
| awgur@5 | 215 """ |
| awgur@5 | 216 uid = tools.get_env(cls.ENV_NAME, request=request) |
| awgur@5 | 217 |
| awgur@5 | 218 if uid is None: |
| awgur@5 | 219 raise IDNotFound() |
| awgur@5 | 220 |
| awgur@5 | 221 if type(uid) is not cls: |
| awgur@5 | 222 raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"') |
| awgur@5 | 223 |
| awgur@5 | 224 return uid |
| awgur@5 | 225 |
| awgur@5 | 226 |
| awgur@5 | 227 class IDHelper(object): |
| awgur@5 | 228 """\ |
| awgur@5 | 229 Класс поддержки идентификации |
| awgur@5 | 230 """ |
| awgur@5 | 231 def __init__(self, |
| awgur@5 | 232 app_name: str, |
| awgur@5 | 233 sign_secret: str, |
| awgur@9 | 234 authelia_helper: Optional[AutheliaHelper] = None, |
| awgur@5 | 235 sess_timeout: int = SESSION_TIMEOUT |
| awgur@5 | 236 ): |
| awgur@5 | 237 """\ |
| awgur@5 | 238 :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-`` |
| awgur@9 | 239 :param authelia_helper: Объект AutheliaHelper с описанием системы аутентификации, если используем Authelia |
| awgur@5 | 240 :param sign_secret: Секрет, которым будет подписываться JWT. |
| awgur@5 | 241 :param sess_timeout: Время жизни сессии пользователя. |
| awgur@5 | 242 """ |
| awgur@5 | 243 if set(ascii_letters + digits + '-') < set(app_name): |
| awgur@5 | 244 _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-'))) |
| awgur@5 | 245 raise IDError(f'Не допустимые символы в имени приложения: {_buf}') |
| awgur@5 | 246 |
| awgur@5 | 247 self.app_name = app_name |
| awgur@5 | 248 self.cookie_name = f'X-ID-{self.app_name}' |
| awgur@5 | 249 self.sess_timeout = sess_timeout |
| awgur@5 | 250 self.jwt_helper = jwt.JWTHelper(sign_secret) |
| awgur@9 | 251 self.ah = authelia_helper |
| awgur@5 | 252 |
| awgur@5 | 253 # Свойства необходимые для работы в качестве плагина Bottle |
| awgur@5 | 254 self.name = 'IDHelper' |
| awgur@5 | 255 self.api = 2 |
| awgur@5 | 256 |
| awgur@5 | 257 def make_cookie(self, uid: Optional[UID] = None, |
| awgur@5 | 258 request: bottle.BaseRequest = bottle.request, |
| awgur@5 | 259 is_secure: bool = True, |
| awgur@5 | 260 is_httponly: bool = True |
| awgur@5 | 261 ) -> Cookie: |
| awgur@5 | 262 """\ |
| awgur@5 | 263 Формируем ``Cookie`` для идентификации сессии пользователя. |
| awgur@5 | 264 |
| awgur@5 | 265 :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса |
| awgur@5 | 266 :param request: Экземпляр запроса, на основании которого формируется ответ. |
| awgur@5 | 267 :param is_secure: Установить признак ``secure`` на cookie |
| awgur@5 | 268 :param is_httponly: Установить признак ``httponly`` на cookie |
| awgur@5 | 269 """ |
| awgur@5 | 270 if uid is None: |
| awgur@5 | 271 uid = UID.from_env(request=request) |
| awgur@5 | 272 |
| awgur@5 | 273 uid_dict = uid.to_dict() |
| awgur@5 | 274 uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request) |
| awgur@5 | 275 |
| awgur@5 | 276 return Cookie( |
| awgur@5 | 277 name=self.cookie_name, |
| awgur@5 | 278 value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout), |
| awgur@5 | 279 max_age=self.sess_timeout, |
| awgur@5 | 280 secure=is_secure, |
| awgur@5 | 281 httponly=is_httponly |
| awgur@5 | 282 ) |
| awgur@5 | 283 |
| awgur@5 | 284 def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response, |
| awgur@5 | 285 request: bottle.BaseRequest = bottle.request, |
| awgur@5 | 286 is_secure: bool = True, |
| awgur@5 | 287 is_httponly: bool = True |
| awgur@5 | 288 ): |
| awgur@5 | 289 """\ |
| awgur@5 | 290 Установка cookie на ответ пользователю |
| awgur@5 | 291 |
| awgur@5 | 292 :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса. |
| awgur@5 | 293 :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response`` |
| awgur@5 | 294 :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request`` |
| awgur@5 | 295 :param is_secure: Установить признак ``secure`` на cookie |
| awgur@5 | 296 :param is_httponly: Установить признак ``httponly`` на cookie |
| awgur@5 | 297 """ |
| awgur@5 | 298 cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly) |
| awgur@5 | 299 cookie.response_add(response) |
| awgur@5 | 300 |
| awgur@5 | 301 def _get_id_impl(self, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 302 """\ |
| awgur@5 | 303 Реализация получения ID и сохранения его в контекст запроса для последующего применения |
| awgur@5 | 304 в других методах класса. |
| awgur@5 | 305 """ |
| awgur@5 | 306 sid = tools.get_cookie(self.cookie_name) |
| awgur@5 | 307 if sid is None: |
| awgur@9 | 308 if self.ah is None: |
| awgur@9 | 309 IDNotFound() |
| awgur@5 | 310 |
| awgur@9 | 311 else: |
| awgur@9 | 312 try: |
| awgur@9 | 313 _ahuser = self.ah(request=request) |
| awgur@9 | 314 uid = UID(uname=_ahuser.uname, acc_tags=_ahuser.groups, user_meta={ |
| awgur@9 | 315 'email': _ahuser.email, |
| awgur@9 | 316 'name': _ahuser.name, |
| awgur@9 | 317 }) |
| awgur@5 | 318 |
| awgur@9 | 319 uid.to_env(request=request) |
| awgur@9 | 320 |
| awgur@9 | 321 except AHAuthError: |
| awgur@9 | 322 raise IDNotFound() |
| awgur@5 | 323 |
| awgur@9 | 324 else: |
| awgur@9 | 325 try: |
| awgur@9 | 326 uid_raw = self.jwt_helper.decode(sid, check_timeout=True) |
| awgur@9 | 327 uid = UID.from_dict(uid_raw) |
| awgur@5 | 328 |
| awgur@9 | 329 fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request) |
| awgur@5 | 330 |
| awgur@9 | 331 if fp != uid_raw.get('fp'): |
| awgur@9 | 332 raise IDCheckError('Проверка подписи сессии не прошла') |
| awgur@9 | 333 |
| awgur@9 | 334 uid.to_env(request=request) |
| awgur@5 | 335 |
| awgur@9 | 336 except (jwt.JWTAuthError, UIDError) as e: |
| awgur@9 | 337 raise IDCheckError(f'Ошибка проверки ID: {e}') |
| awgur@9 | 338 |
| awgur@9 | 339 except (jwt.JWTError, IDError) as e: |
| awgur@9 | 340 raise IDError(f'Ошибка в обработке ID запроса: {e}') |
| awgur@5 | 341 |
| awgur@5 | 342 def need_id(self): |
| awgur@5 | 343 """\ |
| awgur@5 | 344 Декоратор для конкретных точек входа приложения |
| awgur@5 | 345 """ |
| awgur@5 | 346 def d(callback): |
| awgur@5 | 347 def f(*a, **kwa): |
| awgur@5 | 348 self._get_id_impl() |
| awgur@5 | 349 return callback(*a, **kwa) |
| awgur@5 | 350 |
| awgur@5 | 351 return f |
| awgur@5 | 352 |
| awgur@5 | 353 return d |
| awgur@5 | 354 |
| awgur@5 | 355 @staticmethod |
| awgur@5 | 356 def env_id(request: bottle.BaseRequest = bottle.request) -> UID: |
| awgur@5 | 357 """\ |
| awgur@5 | 358 Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне |
| awgur@5 | 359 плагина или декоратора |
| awgur@5 | 360 """ |
| awgur@5 | 361 return UID.from_env(request=request) |
| awgur@5 | 362 |
| awgur@5 | 363 # Реализация методов для работы в качестве Bottle плагина |
| awgur@5 | 364 def setup(self, app: bottle.Bottle): |
| awgur@5 | 365 pass |
| awgur@5 | 366 |
| awgur@5 | 367 def apply(self, callback: Any, route: bottle.Route): |
| awgur@5 | 368 def f(*a, **kwa): |
| awgur@5 | 369 self._get_id_impl() |
| awgur@5 | 370 |
| awgur@5 | 371 return callback(*a, **kwa) |
| awgur@5 | 372 |
| awgur@5 | 373 return f |