py.lib.aw_web_tools
2025-10-19
Parent:9c734271dcf2
py.lib.aw_web_tools/src/aw_web_tools/id_helper.py
. Не изменена версия в редыдущем коммите
| awgur@5 | 1 # coding: utf-8 |
| awgur@5 | 2 """\ |
| awgur@5 | 3 Модуль вспомогательных средств обеспечения идентификации пользователей |
| awgur@5 | 4 """ |
| awgur@5 | 5 import bottle |
| awgur@5 | 6 from string import ascii_letters, digits |
| awgur@5 | 7 from typing import Optional, Dict, Iterable, Any |
| awgur@9 | 8 |
| awgur@5 | 9 from . import btle_tools as tools |
| awgur@5 | 10 from . import Error |
| awgur@8 | 11 from . import jwt_helper |
| awgur@5 | 12 from .cookie import Cookie |
| awgur@9 | 13 from .authelia_helper import AHAuthError, AutheliaHelper, AHUser |
| awgur@5 | 14 |
| awgur@5 | 15 |
| awgur@5 | 16 SESSION_TIMEOUT = 86400 # Продолжительность сессии по умолчанию. |
| awgur@5 | 17 |
| awgur@5 | 18 |
| awgur@5 | 19 class IDError(Error): |
| awgur@5 | 20 """\ |
| awgur@5 | 21 Общий класс ошибок идентификации |
| awgur@5 | 22 """ |
| awgur@5 | 23 |
| awgur@5 | 24 |
| awgur@5 | 25 class IDCheckError(IDError): |
| awgur@5 | 26 """\ |
| awgur@5 | 27 Ошибка проверки ID |
| awgur@5 | 28 """ |
| awgur@5 | 29 |
| awgur@5 | 30 |
| awgur@5 | 31 class IDNotFound(IDCheckError): |
| awgur@5 | 32 """\ |
| awgur@5 | 33 Запрос не идентифицирован |
| awgur@5 | 34 """ |
| awgur@5 | 35 def __init__(self): |
| awgur@5 | 36 super().__init__('Запрос не идентифицирован') |
| awgur@5 | 37 |
| awgur@5 | 38 |
| awgur@5 | 39 class UIDError(IDError): |
| awgur@5 | 40 """\ |
| awgur@5 | 41 Ошибки при работе с классом пользовательского идентификатора |
| awgur@5 | 42 """ |
| awgur@5 | 43 |
| awgur@5 | 44 |
| awgur@5 | 45 class UID(object): |
| awgur@5 | 46 """\ |
| awgur@5 | 47 Класс пользовательского идентификатора. |
| awgur@5 | 48 |
| awgur@5 | 49 Обеспечивает хранение идентификационной информации о пользователе. |
| awgur@5 | 50 """ |
| awgur@5 | 51 ENV_NAME = 'AW_UID' # Идентификатор для хранения объекта в контексте запроса. |
| awgur@5 | 52 |
| awgur@5 | 53 def __init__(self, uname: str, acc_level: int = -1, |
| awgur@5 | 54 sess_id: Optional[str] = None, |
| awgur@5 | 55 acc_tags: Optional[Iterable[str]] = None, |
| awgur@5 | 56 user_meta: Optional[Dict[str, str]] = None |
| awgur@5 | 57 ): |
| awgur@5 | 58 """\ |
| awgur@5 | 59 :param uname: Имя пользователя внутри системы |
| awgur@5 | 60 :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения. |
| awgur@5 | 61 :param sess_id: Идентификатор сессии пользователя |
| awgur@5 | 62 :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов. |
| awgur@5 | 63 Интерпретация целиком на стороне приложения |
| awgur@5 | 64 :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение |
| awgur@5 | 65 """ |
| awgur@5 | 66 self.name = uname |
| awgur@5 | 67 self.level = acc_level |
| awgur@5 | 68 self.sess_id = sess_id |
| awgur@5 | 69 self._access = acc_tags |
| awgur@5 | 70 self._meta = user_meta |
| awgur@5 | 71 |
| awgur@5 | 72 def get_fp_id(self) -> str: |
| awgur@5 | 73 """\ |
| awgur@5 | 74 Преобразует объект в форму, пригодную для получения подписи сессии |
| awgur@5 | 75 """ |
| awgur@5 | 76 buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access()))) |
| awgur@5 | 77 if self.level >= 0: |
| awgur@5 | 78 buf += f':{self.level}' |
| awgur@5 | 79 |
| awgur@5 | 80 return f'{self.name}:{buf}' |
| awgur@5 | 81 |
| awgur@5 | 82 def __str__(self): |
| awgur@5 | 83 return self.name |
| awgur@5 | 84 |
| awgur@5 | 85 def get_access(self): |
| awgur@5 | 86 """\ |
| awgur@5 | 87 Получить теги доступа пользователя, если они имелись |
| awgur@5 | 88 """ |
| awgur@5 | 89 if self._access: |
| awgur@5 | 90 return [i for i in self._access] |
| awgur@5 | 91 |
| awgur@5 | 92 else: |
| awgur@5 | 93 return [] |
| awgur@5 | 94 |
| awgur@5 | 95 def get_meta(self): |
| awgur@5 | 96 """\ |
| awgur@5 | 97 Получить метаданные пользователя, если они имелись. |
| awgur@5 | 98 """ |
| awgur@5 | 99 if self._meta: |
| awgur@5 | 100 return dict([(k, v) for k, v in self._meta.items()]) |
| awgur@5 | 101 |
| awgur@5 | 102 else: |
| awgur@5 | 103 return dict() |
| awgur@5 | 104 |
| awgur@5 | 105 def to_dict(self) -> Dict[str, str]: |
| awgur@5 | 106 """\ |
| awgur@5 | 107 Преобразование текущего состояния объекта в массив |
| awgur@5 | 108 """ |
| awgur@5 | 109 res = dict( |
| awgur@5 | 110 n=self.name, |
| awgur@5 | 111 id=self.sess_id |
| awgur@5 | 112 ) |
| awgur@5 | 113 |
| awgur@5 | 114 if self.level >= 0: |
| awgur@5 | 115 res['al'] = self.level |
| awgur@5 | 116 |
| awgur@5 | 117 if self._access: |
| awgur@5 | 118 res['a'] = self.get_access() |
| awgur@5 | 119 |
| awgur@5 | 120 if self._meta: |
| awgur@5 | 121 res['m'] = self.get_meta() |
| awgur@5 | 122 |
| awgur@5 | 123 return res |
| awgur@5 | 124 |
| awgur@5 | 125 def to_env(self, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 126 """\ |
| awgur@5 | 127 Сохранение своего экземпляра в контекст запроса |
| awgur@5 | 128 """ |
| awgur@5 | 129 tools.set_env(self.ENV_NAME, self, request=request) |
| awgur@5 | 130 |
| awgur@5 | 131 @classmethod |
| awgur@5 | 132 def from_dict(cls, d: Dict[str, Any]): |
| awgur@5 | 133 """\ |
| awgur@5 | 134 Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта, |
| awgur@5 | 135 подобного сохранённому |
| awgur@5 | 136 """ |
| awgur@5 | 137 params = dict() |
| awgur@5 | 138 |
| awgur@5 | 139 # User name |
| awgur@5 | 140 uname = d.get('n') |
| awgur@5 | 141 try: |
| awgur@5 | 142 uname = str(uname) if uname is not None else '' |
| awgur@5 | 143 |
| awgur@5 | 144 except (ValueError, TypeError) as e: |
| awgur@5 | 145 raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"') |
| awgur@5 | 146 |
| awgur@5 | 147 if not uname: |
| awgur@5 | 148 raise UIDError(f'Идентификатор пользователя не задан в переданном словаре') |
| awgur@5 | 149 |
| awgur@5 | 150 params['uname'] = uname |
| awgur@5 | 151 |
| awgur@5 | 152 # Session Identity |
| awgur@5 | 153 sess_id = d.get('id') |
| awgur@5 | 154 if sess_id is not None: |
| awgur@5 | 155 try: |
| awgur@5 | 156 sess_id = str(sess_id) |
| awgur@5 | 157 |
| awgur@5 | 158 except (TypeError, ValueError) as e: |
| awgur@5 | 159 UIDError(f'Не удалось представить идентификатор сессии как строку: ' |
| awgur@5 | 160 f' sess_id="{sess_id}" ' |
| awgur@5 | 161 f' err="{e}"') |
| awgur@5 | 162 |
| awgur@5 | 163 params['sess_id'] = sess_id |
| awgur@5 | 164 |
| awgur@5 | 165 # Access Level |
| awgur@5 | 166 level = d.get('al') |
| awgur@5 | 167 |
| awgur@5 | 168 if level is not None: |
| awgur@5 | 169 if type(level) is not int: |
| awgur@5 | 170 try: |
| awgur@5 | 171 level = int(level) |
| awgur@5 | 172 |
| awgur@5 | 173 except (TypeError, ValueError) as e: |
| awgur@5 | 174 raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:' |
| awgur@5 | 175 f' level="{level}" err="{e}"' |
| awgur@5 | 176 ) |
| awgur@5 | 177 |
| awgur@5 | 178 params['acc_level'] = level |
| awgur@5 | 179 |
| awgur@5 | 180 # Access Tags |
| awgur@5 | 181 acc_tags = d.get('a') |
| awgur@5 | 182 |
| awgur@5 | 183 if acc_tags is not None: |
| awgur@5 | 184 try: |
| awgur@5 | 185 acc_tags = (t for t in iter(acc_tags)) |
| awgur@5 | 186 |
| awgur@5 | 187 except (ValueError, TypeError) as e: |
| awgur@5 | 188 raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: ' |
| awgur@5 | 189 f' acc_tags="{acc_tags}" ' |
| awgur@5 | 190 f' err="{e}"') |
| awgur@5 | 191 |
| awgur@5 | 192 params['acc_tags'] = acc_tags |
| awgur@5 | 193 |
| awgur@5 | 194 # User Metadata |
| awgur@5 | 195 u_meta = d.get('m') |
| awgur@5 | 196 |
| awgur@5 | 197 if u_meta is not None: |
| awgur@5 | 198 if type(u_meta) is not dict: |
| awgur@5 | 199 raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}') |
| awgur@5 | 200 |
| awgur@5 | 201 try: |
| awgur@5 | 202 u_meta = dict((str(k), str(v)) for k, v in u_meta.items()) |
| awgur@5 | 203 |
| awgur@5 | 204 except (TypeError, ValueError) as e: |
| awgur@5 | 205 raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}') |
| awgur@5 | 206 |
| awgur@5 | 207 params['user_meta'] = u_meta |
| awgur@5 | 208 |
| awgur@5 | 209 return cls(**params) |
| awgur@5 | 210 |
| awgur@5 | 211 @classmethod |
| awgur@5 | 212 def from_env(cls, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 213 """\ |
| awgur@5 | 214 Извлечение экземпляра объекта из контекста запроса, если он там сохранён. |
| awgur@5 | 215 """ |
| awgur@5 | 216 uid = tools.get_env(cls.ENV_NAME, request=request) |
| awgur@5 | 217 |
| awgur@5 | 218 if uid is None: |
| awgur@5 | 219 raise IDNotFound() |
| awgur@5 | 220 |
| awgur@5 | 221 if type(uid) is not cls: |
| awgur@5 | 222 raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"') |
| awgur@5 | 223 |
| awgur@5 | 224 return uid |
| awgur@5 | 225 |
| awgur@5 | 226 |
| awgur@5 | 227 class IDHelper(object): |
| awgur@5 | 228 """\ |
| awgur@5 | 229 Класс поддержки идентификации |
| awgur@5 | 230 """ |
| awgur@5 | 231 def __init__(self, |
| awgur@5 | 232 app_name: str, |
| awgur@5 | 233 sign_secret: str, |
| awgur@9 | 234 authelia_helper: Optional[AutheliaHelper] = None, |
| awgur@5 | 235 sess_timeout: int = SESSION_TIMEOUT |
| awgur@5 | 236 ): |
| awgur@5 | 237 """\ |
| awgur@5 | 238 :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-`` |
| awgur@9 | 239 :param authelia_helper: Объект AutheliaHelper с описанием системы аутентификации, если используем Authelia |
| awgur@5 | 240 :param sign_secret: Секрет, которым будет подписываться JWT. |
| awgur@5 | 241 :param sess_timeout: Время жизни сессии пользователя. |
| awgur@5 | 242 """ |
| awgur@5 | 243 if set(ascii_letters + digits + '-') < set(app_name): |
| awgur@5 | 244 _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-'))) |
| awgur@5 | 245 raise IDError(f'Не допустимые символы в имени приложения: {_buf}') |
| awgur@5 | 246 |
| awgur@5 | 247 self.app_name = app_name |
| awgur@5 | 248 self.cookie_name = f'X-ID-{self.app_name}' |
| awgur@5 | 249 self.sess_timeout = sess_timeout |
| awgur@5 | 250 self.jwt_helper = jwt.JWTHelper(sign_secret) |
| awgur@9 | 251 self.ah = authelia_helper |
| awgur@5 | 252 |
| awgur@5 | 253 # Свойства необходимые для работы в качестве плагина Bottle |
| awgur@5 | 254 self.name = 'IDHelper' |
| awgur@5 | 255 self.api = 2 |
| awgur@5 | 256 |
| awgur@5 | 257 def make_cookie(self, uid: Optional[UID] = None, |
| awgur@5 | 258 request: bottle.BaseRequest = bottle.request, |
| awgur@5 | 259 is_secure: bool = True, |
| awgur@5 | 260 is_httponly: bool = True |
| awgur@5 | 261 ) -> Cookie: |
| awgur@5 | 262 """\ |
| awgur@5 | 263 Формируем ``Cookie`` для идентификации сессии пользователя. |
| awgur@5 | 264 |
| awgur@5 | 265 :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса |
| awgur@5 | 266 :param request: Экземпляр запроса, на основании которого формируется ответ. |
| awgur@5 | 267 :param is_secure: Установить признак ``secure`` на cookie |
| awgur@5 | 268 :param is_httponly: Установить признак ``httponly`` на cookie |
| awgur@5 | 269 """ |
| awgur@5 | 270 if uid is None: |
| awgur@5 | 271 uid = UID.from_env(request=request) |
| awgur@5 | 272 |
| awgur@5 | 273 uid_dict = uid.to_dict() |
| awgur@5 | 274 uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request) |
| awgur@5 | 275 |
| awgur@5 | 276 return Cookie( |
| awgur@5 | 277 name=self.cookie_name, |
| awgur@5 | 278 value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout), |
| awgur@5 | 279 max_age=self.sess_timeout, |
| awgur@5 | 280 secure=is_secure, |
| awgur@5 | 281 httponly=is_httponly |
| awgur@5 | 282 ) |
| awgur@5 | 283 |
| awgur@5 | 284 def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response, |
| awgur@5 | 285 request: bottle.BaseRequest = bottle.request, |
| awgur@5 | 286 is_secure: bool = True, |
| awgur@5 | 287 is_httponly: bool = True |
| awgur@5 | 288 ): |
| awgur@5 | 289 """\ |
| awgur@5 | 290 Установка cookie на ответ пользователю |
| awgur@5 | 291 |
| awgur@5 | 292 :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса. |
| awgur@5 | 293 :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response`` |
| awgur@5 | 294 :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request`` |
| awgur@5 | 295 :param is_secure: Установить признак ``secure`` на cookie |
| awgur@5 | 296 :param is_httponly: Установить признак ``httponly`` на cookie |
| awgur@5 | 297 """ |
| awgur@5 | 298 cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly) |
| awgur@5 | 299 cookie.response_add(response) |
| awgur@5 | 300 |
| awgur@5 | 301 def _get_id_impl(self, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 302 """\ |
| awgur@5 | 303 Реализация получения ID и сохранения его в контекст запроса для последующего применения |
| awgur@5 | 304 в других методах класса. |
| awgur@5 | 305 """ |
| awgur@5 | 306 sid = tools.get_cookie(self.cookie_name) |
| awgur@5 | 307 if sid is None: |
| awgur@9 | 308 if self.ah is None: |
| awgur@9 | 309 IDNotFound() |
| awgur@5 | 310 |
| awgur@9 | 311 else: |
| awgur@9 | 312 try: |
| awgur@9 | 313 _ahuser = self.ah(request=request) |
| awgur@9 | 314 uid = UID(uname=_ahuser.uname, acc_tags=_ahuser.groups, user_meta={ |
| awgur@9 | 315 'email': _ahuser.email, |
| awgur@9 | 316 'name': _ahuser.name, |
| awgur@9 | 317 }) |
| awgur@5 | 318 |
| awgur@9 | 319 uid.to_env(request=request) |
| awgur@9 | 320 |
| awgur@9 | 321 except AHAuthError: |
| awgur@9 | 322 raise IDNotFound() |
| awgur@5 | 323 |
| awgur@9 | 324 else: |
| awgur@9 | 325 try: |
| awgur@9 | 326 uid_raw = self.jwt_helper.decode(sid, check_timeout=True) |
| awgur@9 | 327 uid = UID.from_dict(uid_raw) |
| awgur@5 | 328 |
| awgur@9 | 329 fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request) |
| awgur@5 | 330 |
| awgur@9 | 331 if fp != uid_raw.get('fp'): |
| awgur@9 | 332 raise IDCheckError('Проверка подписи сессии не прошла') |
| awgur@9 | 333 |
| awgur@9 | 334 uid.to_env(request=request) |
| awgur@5 | 335 |
| awgur@9 | 336 except (jwt.JWTAuthError, UIDError) as e: |
| awgur@9 | 337 raise IDCheckError(f'Ошибка проверки ID: {e}') |
| awgur@9 | 338 |
| awgur@9 | 339 except (jwt.JWTError, IDError) as e: |
| awgur@9 | 340 raise IDError(f'Ошибка в обработке ID запроса: {e}') |
| awgur@5 | 341 |
| awgur@5 | 342 def need_id(self): |
| awgur@5 | 343 """\ |
| awgur@5 | 344 Декоратор для конкретных точек входа приложения |
| awgur@5 | 345 """ |
| awgur@5 | 346 def d(callback): |
| awgur@5 | 347 def f(*a, **kwa): |
| awgur@5 | 348 self._get_id_impl() |
| awgur@5 | 349 return callback(*a, **kwa) |
| awgur@5 | 350 |
| awgur@5 | 351 return f |
| awgur@5 | 352 |
| awgur@5 | 353 return d |
| awgur@5 | 354 |
| awgur@5 | 355 @staticmethod |
| awgur@5 | 356 def env_id(request: bottle.BaseRequest = bottle.request) -> UID: |
| awgur@5 | 357 """\ |
| awgur@5 | 358 Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне |
| awgur@5 | 359 плагина или декоратора |
| awgur@5 | 360 """ |
| awgur@5 | 361 return UID.from_env(request=request) |
| awgur@5 | 362 |
| awgur@5 | 363 # Реализация методов для работы в качестве Bottle плагина |
| awgur@5 | 364 def setup(self, app: bottle.Bottle): |
| awgur@5 | 365 pass |
| awgur@5 | 366 |
| awgur@5 | 367 def apply(self, callback: Any, route: bottle.Route): |
| awgur@5 | 368 def f(*a, **kwa): |
| awgur@5 | 369 self._get_id_impl() |
| awgur@5 | 370 |
| awgur@5 | 371 return callback(*a, **kwa) |
| awgur@5 | 372 |
| awgur@5 | 373 return f |