py.lib.aw_web_tools
py.lib.aw_web_tools/src/aw_web_tools/id_helper.py
.. 1.202411.1 . Переделываем под новые веяния SDK * Рефакторинг
| awgur@5 | 1 # coding: utf-8 |
| awgur@5 | 2 """\ |
| awgur@5 | 3 Модуль вспомогательных средств обеспечения идентификации пользователей |
| awgur@5 | 4 """ |
| awgur@5 | 5 import bottle |
| awgur@5 | 6 from string import ascii_letters, digits |
| awgur@5 | 7 from typing import Optional, Dict, Iterable, Any |
| awgur@5 | 8 from . import btle_tools as tools |
| awgur@5 | 9 from . import Error |
| awgur@8 | 10 from . import jwt_helper |
| awgur@5 | 11 from .cookie import Cookie |
| awgur@5 | 12 |
| awgur@5 | 13 |
| awgur@5 | 14 SESSION_TIMEOUT = 86400 # Продолжительность сессии по умолчанию. |
| awgur@5 | 15 |
| awgur@5 | 16 |
| awgur@5 | 17 class IDError(Error): |
| awgur@5 | 18 """\ |
| awgur@5 | 19 Общий класс ошибок идентификации |
| awgur@5 | 20 """ |
| awgur@5 | 21 |
| awgur@5 | 22 |
| awgur@5 | 23 class IDCheckError(IDError): |
| awgur@5 | 24 """\ |
| awgur@5 | 25 Ошибка проверки ID |
| awgur@5 | 26 """ |
| awgur@5 | 27 |
| awgur@5 | 28 |
| awgur@5 | 29 class IDNotFound(IDCheckError): |
| awgur@5 | 30 """\ |
| awgur@5 | 31 Запрос не идентифицирован |
| awgur@5 | 32 """ |
| awgur@5 | 33 def __init__(self): |
| awgur@5 | 34 super().__init__('Запрос не идентифицирован') |
| awgur@5 | 35 |
| awgur@5 | 36 |
| awgur@5 | 37 class UIDError(IDError): |
| awgur@5 | 38 """\ |
| awgur@5 | 39 Ошибки при работе с классом пользовательского идентификатора |
| awgur@5 | 40 """ |
| awgur@5 | 41 |
| awgur@5 | 42 |
| awgur@5 | 43 class UID(object): |
| awgur@5 | 44 """\ |
| awgur@5 | 45 Класс пользовательского идентификатора. |
| awgur@5 | 46 |
| awgur@5 | 47 Обеспечивает хранение идентификационной информации о пользователе. |
| awgur@5 | 48 """ |
| awgur@5 | 49 ENV_NAME = 'AW_UID' # Идентификатор для хранения объекта в контексте запроса. |
| awgur@5 | 50 |
| awgur@5 | 51 def __init__(self, uname: str, acc_level: int = -1, |
| awgur@5 | 52 sess_id: Optional[str] = None, |
| awgur@5 | 53 acc_tags: Optional[Iterable[str]] = None, |
| awgur@5 | 54 user_meta: Optional[Dict[str, str]] = None |
| awgur@5 | 55 ): |
| awgur@5 | 56 """\ |
| awgur@5 | 57 :param uname: Имя пользователя внутри системы |
| awgur@5 | 58 :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения. |
| awgur@5 | 59 :param sess_id: Идентификатор сессии пользователя |
| awgur@5 | 60 :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов. |
| awgur@5 | 61 Интерпретация целиком на стороне приложения |
| awgur@5 | 62 :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение |
| awgur@5 | 63 """ |
| awgur@5 | 64 self.name = uname |
| awgur@5 | 65 self.level = acc_level |
| awgur@5 | 66 self.sess_id = sess_id |
| awgur@5 | 67 self._access = acc_tags |
| awgur@5 | 68 self._meta = user_meta |
| awgur@5 | 69 |
| awgur@5 | 70 def get_fp_id(self) -> str: |
| awgur@5 | 71 """\ |
| awgur@5 | 72 Преобразует объект в форму, пригодную для получения подписи сессии |
| awgur@5 | 73 """ |
| awgur@5 | 74 buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access()))) |
| awgur@5 | 75 if self.level >= 0: |
| awgur@5 | 76 buf += f':{self.level}' |
| awgur@5 | 77 |
| awgur@5 | 78 return f'{self.name}:{buf}' |
| awgur@5 | 79 |
| awgur@5 | 80 def __str__(self): |
| awgur@5 | 81 return self.name |
| awgur@5 | 82 |
| awgur@5 | 83 def get_access(self): |
| awgur@5 | 84 """\ |
| awgur@5 | 85 Получить теги доступа пользователя, если они имелись |
| awgur@5 | 86 """ |
| awgur@5 | 87 if self._access: |
| awgur@5 | 88 return [i for i in self._access] |
| awgur@5 | 89 |
| awgur@5 | 90 else: |
| awgur@5 | 91 return [] |
| awgur@5 | 92 |
| awgur@5 | 93 def get_meta(self): |
| awgur@5 | 94 """\ |
| awgur@5 | 95 Получить метаданные пользователя, если они имелись. |
| awgur@5 | 96 """ |
| awgur@5 | 97 if self._meta: |
| awgur@5 | 98 return dict([(k, v) for k, v in self._meta.items()]) |
| awgur@5 | 99 |
| awgur@5 | 100 else: |
| awgur@5 | 101 return dict() |
| awgur@5 | 102 |
| awgur@5 | 103 def to_dict(self) -> Dict[str, str]: |
| awgur@5 | 104 """\ |
| awgur@5 | 105 Преобразование текущего состояния объекта в массив |
| awgur@5 | 106 """ |
| awgur@5 | 107 res = dict( |
| awgur@5 | 108 n=self.name, |
| awgur@5 | 109 id=self.sess_id |
| awgur@5 | 110 ) |
| awgur@5 | 111 |
| awgur@5 | 112 if self.level >= 0: |
| awgur@5 | 113 res['al'] = self.level |
| awgur@5 | 114 |
| awgur@5 | 115 if self._access: |
| awgur@5 | 116 res['a'] = self.get_access() |
| awgur@5 | 117 |
| awgur@5 | 118 if self._meta: |
| awgur@5 | 119 res['m'] = self.get_meta() |
| awgur@5 | 120 |
| awgur@5 | 121 return res |
| awgur@5 | 122 |
| awgur@5 | 123 def to_env(self, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 124 """\ |
| awgur@5 | 125 Сохранение своего экземпляра в контекст запроса |
| awgur@5 | 126 """ |
| awgur@5 | 127 tools.set_env(self.ENV_NAME, self, request=request) |
| awgur@5 | 128 |
| awgur@5 | 129 @classmethod |
| awgur@5 | 130 def from_dict(cls, d: Dict[str, Any]): |
| awgur@5 | 131 """\ |
| awgur@5 | 132 Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта, |
| awgur@5 | 133 подобного сохранённому |
| awgur@5 | 134 """ |
| awgur@5 | 135 params = dict() |
| awgur@5 | 136 |
| awgur@5 | 137 # User name |
| awgur@5 | 138 uname = d.get('n') |
| awgur@5 | 139 try: |
| awgur@5 | 140 uname = str(uname) if uname is not None else '' |
| awgur@5 | 141 |
| awgur@5 | 142 except (ValueError, TypeError) as e: |
| awgur@5 | 143 raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"') |
| awgur@5 | 144 |
| awgur@5 | 145 if not uname: |
| awgur@5 | 146 raise UIDError(f'Идентификатор пользователя не задан в переданном словаре') |
| awgur@5 | 147 |
| awgur@5 | 148 params['uname'] = uname |
| awgur@5 | 149 |
| awgur@5 | 150 # Session Identity |
| awgur@5 | 151 sess_id = d.get('id') |
| awgur@5 | 152 if sess_id is not None: |
| awgur@5 | 153 try: |
| awgur@5 | 154 sess_id = str(sess_id) |
| awgur@5 | 155 |
| awgur@5 | 156 except (TypeError, ValueError) as e: |
| awgur@5 | 157 UIDError(f'Не удалось представить идентификатор сессии как строку: ' |
| awgur@5 | 158 f' sess_id="{sess_id}" ' |
| awgur@5 | 159 f' err="{e}"') |
| awgur@5 | 160 |
| awgur@5 | 161 params['sess_id'] = sess_id |
| awgur@5 | 162 |
| awgur@5 | 163 # Access Level |
| awgur@5 | 164 level = d.get('al') |
| awgur@5 | 165 |
| awgur@5 | 166 if level is not None: |
| awgur@5 | 167 if type(level) is not int: |
| awgur@5 | 168 try: |
| awgur@5 | 169 level = int(level) |
| awgur@5 | 170 |
| awgur@5 | 171 except (TypeError, ValueError) as e: |
| awgur@5 | 172 raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:' |
| awgur@5 | 173 f' level="{level}" err="{e}"' |
| awgur@5 | 174 ) |
| awgur@5 | 175 |
| awgur@5 | 176 params['acc_level'] = level |
| awgur@5 | 177 |
| awgur@5 | 178 # Access Tags |
| awgur@5 | 179 acc_tags = d.get('a') |
| awgur@5 | 180 |
| awgur@5 | 181 if acc_tags is not None: |
| awgur@5 | 182 try: |
| awgur@5 | 183 acc_tags = (t for t in iter(acc_tags)) |
| awgur@5 | 184 |
| awgur@5 | 185 except (ValueError, TypeError) as e: |
| awgur@5 | 186 raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: ' |
| awgur@5 | 187 f' acc_tags="{acc_tags}" ' |
| awgur@5 | 188 f' err="{e}"') |
| awgur@5 | 189 |
| awgur@5 | 190 params['acc_tags'] = acc_tags |
| awgur@5 | 191 |
| awgur@5 | 192 # User Metadata |
| awgur@5 | 193 u_meta = d.get('m') |
| awgur@5 | 194 |
| awgur@5 | 195 if u_meta is not None: |
| awgur@5 | 196 if type(u_meta) is not dict: |
| awgur@5 | 197 raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}') |
| awgur@5 | 198 |
| awgur@5 | 199 try: |
| awgur@5 | 200 u_meta = dict((str(k), str(v)) for k, v in u_meta.items()) |
| awgur@5 | 201 |
| awgur@5 | 202 except (TypeError, ValueError) as e: |
| awgur@5 | 203 raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}') |
| awgur@5 | 204 |
| awgur@5 | 205 params['user_meta'] = u_meta |
| awgur@5 | 206 |
| awgur@5 | 207 return cls(**params) |
| awgur@5 | 208 |
| awgur@5 | 209 @classmethod |
| awgur@5 | 210 def from_env(cls, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 211 """\ |
| awgur@5 | 212 Извлечение экземпляра объекта из контекста запроса, если он там сохранён. |
| awgur@5 | 213 """ |
| awgur@5 | 214 uid = tools.get_env(cls.ENV_NAME, request=request) |
| awgur@5 | 215 |
| awgur@5 | 216 if uid is None: |
| awgur@5 | 217 raise IDNotFound() |
| awgur@5 | 218 |
| awgur@5 | 219 if type(uid) is not cls: |
| awgur@5 | 220 raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"') |
| awgur@5 | 221 |
| awgur@5 | 222 return uid |
| awgur@5 | 223 |
| awgur@5 | 224 |
| awgur@5 | 225 class IDHelper(object): |
| awgur@5 | 226 """\ |
| awgur@5 | 227 Класс поддержки идентификации |
| awgur@5 | 228 """ |
| awgur@5 | 229 def __init__(self, |
| awgur@5 | 230 app_name: str, |
| awgur@5 | 231 sign_secret: str, |
| awgur@5 | 232 sess_timeout: int = SESSION_TIMEOUT |
| awgur@5 | 233 ): |
| awgur@5 | 234 """\ |
| awgur@5 | 235 :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-`` |
| awgur@5 | 236 :param sign_secret: Секрет, которым будет подписываться JWT. |
| awgur@5 | 237 :param sess_timeout: Время жизни сессии пользователя. |
| awgur@5 | 238 """ |
| awgur@5 | 239 if set(ascii_letters + digits + '-') < set(app_name): |
| awgur@5 | 240 _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-'))) |
| awgur@5 | 241 raise IDError(f'Не допустимые символы в имени приложения: {_buf}') |
| awgur@5 | 242 |
| awgur@5 | 243 self.app_name = app_name |
| awgur@5 | 244 self.cookie_name = f'X-ID-{self.app_name}' |
| awgur@5 | 245 self.sess_timeout = sess_timeout |
| awgur@5 | 246 self.jwt_helper = jwt.JWTHelper(sign_secret) |
| awgur@5 | 247 |
| awgur@5 | 248 # Свойства необходимые для работы в качестве плагина Bottle |
| awgur@5 | 249 self.name = 'IDHelper' |
| awgur@5 | 250 self.api = 2 |
| awgur@5 | 251 |
| awgur@5 | 252 def make_cookie(self, uid: Optional[UID] = None, |
| awgur@5 | 253 request: bottle.BaseRequest = bottle.request, |
| awgur@5 | 254 is_secure: bool = True, |
| awgur@5 | 255 is_httponly: bool = True |
| awgur@5 | 256 ) -> Cookie: |
| awgur@5 | 257 """\ |
| awgur@5 | 258 Формируем ``Cookie`` для идентификации сессии пользователя. |
| awgur@5 | 259 |
| awgur@5 | 260 :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса |
| awgur@5 | 261 :param request: Экземпляр запроса, на основании которого формируется ответ. |
| awgur@5 | 262 :param is_secure: Установить признак ``secure`` на cookie |
| awgur@5 | 263 :param is_httponly: Установить признак ``httponly`` на cookie |
| awgur@5 | 264 """ |
| awgur@5 | 265 if uid is None: |
| awgur@5 | 266 uid = UID.from_env(request=request) |
| awgur@5 | 267 |
| awgur@5 | 268 uid_dict = uid.to_dict() |
| awgur@5 | 269 uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request) |
| awgur@5 | 270 |
| awgur@5 | 271 return Cookie( |
| awgur@5 | 272 name=self.cookie_name, |
| awgur@5 | 273 value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout), |
| awgur@5 | 274 max_age=self.sess_timeout, |
| awgur@5 | 275 secure=is_secure, |
| awgur@5 | 276 httponly=is_httponly |
| awgur@5 | 277 ) |
| awgur@5 | 278 |
| awgur@5 | 279 def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response, |
| awgur@5 | 280 request: bottle.BaseRequest = bottle.request, |
| awgur@5 | 281 is_secure: bool = True, |
| awgur@5 | 282 is_httponly: bool = True |
| awgur@5 | 283 ): |
| awgur@5 | 284 """\ |
| awgur@5 | 285 Установка cookie на ответ пользователю |
| awgur@5 | 286 |
| awgur@5 | 287 :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса. |
| awgur@5 | 288 :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response`` |
| awgur@5 | 289 :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request`` |
| awgur@5 | 290 :param is_secure: Установить признак ``secure`` на cookie |
| awgur@5 | 291 :param is_httponly: Установить признак ``httponly`` на cookie |
| awgur@5 | 292 """ |
| awgur@5 | 293 cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly) |
| awgur@5 | 294 cookie.response_add(response) |
| awgur@5 | 295 |
| awgur@5 | 296 def _get_id_impl(self, request: bottle.BaseRequest = bottle.request): |
| awgur@5 | 297 """\ |
| awgur@5 | 298 Реализация получения ID и сохранения его в контекст запроса для последующего применения |
| awgur@5 | 299 в других методах класса. |
| awgur@5 | 300 """ |
| awgur@5 | 301 sid = tools.get_cookie(self.cookie_name) |
| awgur@5 | 302 if sid is None: |
| awgur@5 | 303 raise IDNotFound() |
| awgur@5 | 304 |
| awgur@5 | 305 try: |
| awgur@5 | 306 uid_raw = self.jwt_helper.decode(sid, check_timeout=True) |
| awgur@5 | 307 uid = UID.from_dict(uid_raw) |
| awgur@5 | 308 |
| awgur@5 | 309 fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request) |
| awgur@5 | 310 |
| awgur@5 | 311 if fp != uid_raw.get('fp'): |
| awgur@5 | 312 raise IDCheckError('Проверка подписи сессии не прошла') |
| awgur@5 | 313 |
| awgur@5 | 314 uid.to_env(request=request) |
| awgur@5 | 315 |
| awgur@5 | 316 except (jwt.JWTAuthError, UIDError) as e: |
| awgur@5 | 317 raise IDCheckError(f'Ошибка проверки ID: {e}') |
| awgur@5 | 318 |
| awgur@5 | 319 except (jwt.JWTError, IDError) as e: |
| awgur@5 | 320 raise IDError(f'Ошибка в обработке ID запроса: {e}') |
| awgur@5 | 321 |
| awgur@5 | 322 def need_id(self): |
| awgur@5 | 323 """\ |
| awgur@5 | 324 Декоратор для конкретных точек входа приложения |
| awgur@5 | 325 """ |
| awgur@5 | 326 def d(callback): |
| awgur@5 | 327 def f(*a, **kwa): |
| awgur@5 | 328 self._get_id_impl() |
| awgur@5 | 329 return callback(*a, **kwa) |
| awgur@5 | 330 |
| awgur@5 | 331 return f |
| awgur@5 | 332 |
| awgur@5 | 333 return d |
| awgur@5 | 334 |
| awgur@5 | 335 @staticmethod |
| awgur@5 | 336 def env_id(request: bottle.BaseRequest = bottle.request) -> UID: |
| awgur@5 | 337 """\ |
| awgur@5 | 338 Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне |
| awgur@5 | 339 плагина или декоратора |
| awgur@5 | 340 """ |
| awgur@5 | 341 return UID.from_env(request=request) |
| awgur@5 | 342 |
| awgur@5 | 343 # Реализация методов для работы в качестве Bottle плагина |
| awgur@5 | 344 def setup(self, app: bottle.Bottle): |
| awgur@5 | 345 pass |
| awgur@5 | 346 |
| awgur@5 | 347 def apply(self, callback: Any, route: bottle.Route): |
| awgur@5 | 348 def f(*a, **kwa): |
| awgur@5 | 349 self._get_id_impl() |
| awgur@5 | 350 |
| awgur@5 | 351 return callback(*a, **kwa) |
| awgur@5 | 352 |
| awgur@5 | 353 return f |