py.lib.aw_web_tools

Yohn Y. 2025-10-15 Parent:9c734271dcf2

15:645c171efc96 Go to Latest

py.lib.aw_web_tools/src/aw_web_tools/id_helper.py

* Изменения в системе сборки пакетов Python

History
1 # coding: utf-8
2 """\
3 Модуль вспомогательных средств обеспечения идентификации пользователей
4 """
5 import bottle
6 from string import ascii_letters, digits
7 from typing import Optional, Dict, Iterable, Any
9 from . import btle_tools as tools
10 from . import Error
11 from . import jwt_helper
12 from .cookie import Cookie
13 from .authelia_helper import AHAuthError, AutheliaHelper, AHUser
16 SESSION_TIMEOUT = 86400 # Продолжительность сессии по умолчанию.
19 class IDError(Error):
20 """\
21 Общий класс ошибок идентификации
22 """
25 class IDCheckError(IDError):
26 """\
27 Ошибка проверки ID
28 """
31 class IDNotFound(IDCheckError):
32 """\
33 Запрос не идентифицирован
34 """
35 def __init__(self):
36 super().__init__('Запрос не идентифицирован')
39 class UIDError(IDError):
40 """\
41 Ошибки при работе с классом пользовательского идентификатора
42 """
45 class UID(object):
46 """\
47 Класс пользовательского идентификатора.
49 Обеспечивает хранение идентификационной информации о пользователе.
50 """
51 ENV_NAME = 'AW_UID' # Идентификатор для хранения объекта в контексте запроса.
53 def __init__(self, uname: str, acc_level: int = -1,
54 sess_id: Optional[str] = None,
55 acc_tags: Optional[Iterable[str]] = None,
56 user_meta: Optional[Dict[str, str]] = None
57 ):
58 """\
59 :param uname: Имя пользователя внутри системы
60 :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения.
61 :param sess_id: Идентификатор сессии пользователя
62 :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов.
63 Интерпретация целиком на стороне приложения
64 :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение
65 """
66 self.name = uname
67 self.level = acc_level
68 self.sess_id = sess_id
69 self._access = acc_tags
70 self._meta = user_meta
72 def get_fp_id(self) -> str:
73 """\
74 Преобразует объект в форму, пригодную для получения подписи сессии
75 """
76 buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access())))
77 if self.level >= 0:
78 buf += f':{self.level}'
80 return f'{self.name}:{buf}'
82 def __str__(self):
83 return self.name
85 def get_access(self):
86 """\
87 Получить теги доступа пользователя, если они имелись
88 """
89 if self._access:
90 return [i for i in self._access]
92 else:
93 return []
95 def get_meta(self):
96 """\
97 Получить метаданные пользователя, если они имелись.
98 """
99 if self._meta:
100 return dict([(k, v) for k, v in self._meta.items()])
102 else:
103 return dict()
105 def to_dict(self) -> Dict[str, str]:
106 """\
107 Преобразование текущего состояния объекта в массив
108 """
109 res = dict(
110 n=self.name,
111 id=self.sess_id
114 if self.level >= 0:
115 res['al'] = self.level
117 if self._access:
118 res['a'] = self.get_access()
120 if self._meta:
121 res['m'] = self.get_meta()
123 return res
125 def to_env(self, request: bottle.BaseRequest = bottle.request):
126 """\
127 Сохранение своего экземпляра в контекст запроса
128 """
129 tools.set_env(self.ENV_NAME, self, request=request)
131 @classmethod
132 def from_dict(cls, d: Dict[str, Any]):
133 """\
134 Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта,
135 подобного сохранённому
136 """
137 params = dict()
139 # User name
140 uname = d.get('n')
141 try:
142 uname = str(uname) if uname is not None else ''
144 except (ValueError, TypeError) as e:
145 raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"')
147 if not uname:
148 raise UIDError(f'Идентификатор пользователя не задан в переданном словаре')
150 params['uname'] = uname
152 # Session Identity
153 sess_id = d.get('id')
154 if sess_id is not None:
155 try:
156 sess_id = str(sess_id)
158 except (TypeError, ValueError) as e:
159 UIDError(f'Не удалось представить идентификатор сессии как строку: '
160 f' sess_id="{sess_id}" '
161 f' err="{e}"')
163 params['sess_id'] = sess_id
165 # Access Level
166 level = d.get('al')
168 if level is not None:
169 if type(level) is not int:
170 try:
171 level = int(level)
173 except (TypeError, ValueError) as e:
174 raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:'
175 f' level="{level}" err="{e}"'
178 params['acc_level'] = level
180 # Access Tags
181 acc_tags = d.get('a')
183 if acc_tags is not None:
184 try:
185 acc_tags = (t for t in iter(acc_tags))
187 except (ValueError, TypeError) as e:
188 raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: '
189 f' acc_tags="{acc_tags}" '
190 f' err="{e}"')
192 params['acc_tags'] = acc_tags
194 # User Metadata
195 u_meta = d.get('m')
197 if u_meta is not None:
198 if type(u_meta) is not dict:
199 raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}')
201 try:
202 u_meta = dict((str(k), str(v)) for k, v in u_meta.items())
204 except (TypeError, ValueError) as e:
205 raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}')
207 params['user_meta'] = u_meta
209 return cls(**params)
211 @classmethod
212 def from_env(cls, request: bottle.BaseRequest = bottle.request):
213 """\
214 Извлечение экземпляра объекта из контекста запроса, если он там сохранён.
215 """
216 uid = tools.get_env(cls.ENV_NAME, request=request)
218 if uid is None:
219 raise IDNotFound()
221 if type(uid) is not cls:
222 raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"')
224 return uid
227 class IDHelper(object):
228 """\
229 Класс поддержки идентификации
230 """
231 def __init__(self,
232 app_name: str,
233 sign_secret: str,
234 authelia_helper: Optional[AutheliaHelper] = None,
235 sess_timeout: int = SESSION_TIMEOUT
236 ):
237 """\
238 :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-``
239 :param authelia_helper: Объект AutheliaHelper с описанием системы аутентификации, если используем Authelia
240 :param sign_secret: Секрет, которым будет подписываться JWT.
241 :param sess_timeout: Время жизни сессии пользователя.
242 """
243 if set(ascii_letters + digits + '-') < set(app_name):
244 _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-')))
245 raise IDError(f'Не допустимые символы в имени приложения: {_buf}')
247 self.app_name = app_name
248 self.cookie_name = f'X-ID-{self.app_name}'
249 self.sess_timeout = sess_timeout
250 self.jwt_helper = jwt.JWTHelper(sign_secret)
251 self.ah = authelia_helper
253 # Свойства необходимые для работы в качестве плагина Bottle
254 self.name = 'IDHelper'
255 self.api = 2
257 def make_cookie(self, uid: Optional[UID] = None,
258 request: bottle.BaseRequest = bottle.request,
259 is_secure: bool = True,
260 is_httponly: bool = True
261 ) -> Cookie:
262 """\
263 Формируем ``Cookie`` для идентификации сессии пользователя.
265 :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса
266 :param request: Экземпляр запроса, на основании которого формируется ответ.
267 :param is_secure: Установить признак ``secure`` на cookie
268 :param is_httponly: Установить признак ``httponly`` на cookie
269 """
270 if uid is None:
271 uid = UID.from_env(request=request)
273 uid_dict = uid.to_dict()
274 uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
276 return Cookie(
277 name=self.cookie_name,
278 value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout),
279 max_age=self.sess_timeout,
280 secure=is_secure,
281 httponly=is_httponly
284 def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response,
285 request: bottle.BaseRequest = bottle.request,
286 is_secure: bool = True,
287 is_httponly: bool = True
288 ):
289 """\
290 Установка cookie на ответ пользователю
292 :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса.
293 :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response``
294 :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request``
295 :param is_secure: Установить признак ``secure`` на cookie
296 :param is_httponly: Установить признак ``httponly`` на cookie
297 """
298 cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly)
299 cookie.response_add(response)
301 def _get_id_impl(self, request: bottle.BaseRequest = bottle.request):
302 """\
303 Реализация получения ID и сохранения его в контекст запроса для последующего применения
304 в других методах класса.
305 """
306 sid = tools.get_cookie(self.cookie_name)
307 if sid is None:
308 if self.ah is None:
309 IDNotFound()
311 else:
312 try:
313 _ahuser = self.ah(request=request)
314 uid = UID(uname=_ahuser.uname, acc_tags=_ahuser.groups, user_meta={
315 'email': _ahuser.email,
316 'name': _ahuser.name,
317 })
319 uid.to_env(request=request)
321 except AHAuthError:
322 raise IDNotFound()
324 else:
325 try:
326 uid_raw = self.jwt_helper.decode(sid, check_timeout=True)
327 uid = UID.from_dict(uid_raw)
329 fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
331 if fp != uid_raw.get('fp'):
332 raise IDCheckError('Проверка подписи сессии не прошла')
334 uid.to_env(request=request)
336 except (jwt.JWTAuthError, UIDError) as e:
337 raise IDCheckError(f'Ошибка проверки ID: {e}')
339 except (jwt.JWTError, IDError) as e:
340 raise IDError(f'Ошибка в обработке ID запроса: {e}')
342 def need_id(self):
343 """\
344 Декоратор для конкретных точек входа приложения
345 """
346 def d(callback):
347 def f(*a, **kwa):
348 self._get_id_impl()
349 return callback(*a, **kwa)
351 return f
353 return d
355 @staticmethod
356 def env_id(request: bottle.BaseRequest = bottle.request) -> UID:
357 """\
358 Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне
359 плагина или декоратора
360 """
361 return UID.from_env(request=request)
363 # Реализация методов для работы в качестве Bottle плагина
364 def setup(self, app: bottle.Bottle):
365 pass
367 def apply(self, callback: Any, route: bottle.Route):
368 def f(*a, **kwa):
369 self._get_id_impl()
371 return callback(*a, **kwa)
373 return f