py.lib.aw_web_tools

Yohn Y. 2025-10-15 Parent:9c734271dcf2

14:0920ae304dfd Go to Latest

py.lib.aw_web_tools/src/aw_web_tools/id_helper.py

.. 1.202510.1 + Режим фековой авторизации для адаптера authelia. Режим требуется для отладки, поскольку вряд ли на машине разработчика будет развёрнуто это ПО на ранних стадиях разработки (преальфа). Возможно режим будет полезен при поиска проблем в приложении при авторизации.

History
1 # coding: utf-8
2 """\
3 Модуль вспомогательных средств обеспечения идентификации пользователей
4 """
5 import bottle
6 from string import ascii_letters, digits
7 from typing import Optional, Dict, Iterable, Any
9 from . import btle_tools as tools
10 from . import Error
11 from . import jwt_helper
12 from .cookie import Cookie
13 from .authelia_helper import AHAuthError, AutheliaHelper, AHUser
16 SESSION_TIMEOUT = 86400 # Продолжительность сессии по умолчанию.
19 class IDError(Error):
20 """\
21 Общий класс ошибок идентификации
22 """
25 class IDCheckError(IDError):
26 """\
27 Ошибка проверки ID
28 """
31 class IDNotFound(IDCheckError):
32 """\
33 Запрос не идентифицирован
34 """
35 def __init__(self):
36 super().__init__('Запрос не идентифицирован')
39 class UIDError(IDError):
40 """\
41 Ошибки при работе с классом пользовательского идентификатора
42 """
45 class UID(object):
46 """\
47 Класс пользовательского идентификатора.
49 Обеспечивает хранение идентификационной информации о пользователе.
50 """
51 ENV_NAME = 'AW_UID' # Идентификатор для хранения объекта в контексте запроса.
53 def __init__(self, uname: str, acc_level: int = -1,
54 sess_id: Optional[str] = None,
55 acc_tags: Optional[Iterable[str]] = None,
56 user_meta: Optional[Dict[str, str]] = None
57 ):
58 """\
59 :param uname: Имя пользователя внутри системы
60 :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения.
61 :param sess_id: Идентификатор сессии пользователя
62 :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов.
63 Интерпретация целиком на стороне приложения
64 :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение
65 """
66 self.name = uname
67 self.level = acc_level
68 self.sess_id = sess_id
69 self._access = acc_tags
70 self._meta = user_meta
72 def get_fp_id(self) -> str:
73 """\
74 Преобразует объект в форму, пригодную для получения подписи сессии
75 """
76 buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access())))
77 if self.level >= 0:
78 buf += f':{self.level}'
80 return f'{self.name}:{buf}'
82 def __str__(self):
83 return self.name
85 def get_access(self):
86 """\
87 Получить теги доступа пользователя, если они имелись
88 """
89 if self._access:
90 return [i for i in self._access]
92 else:
93 return []
95 def get_meta(self):
96 """\
97 Получить метаданные пользователя, если они имелись.
98 """
99 if self._meta:
100 return dict([(k, v) for k, v in self._meta.items()])
102 else:
103 return dict()
105 def to_dict(self) -> Dict[str, str]:
106 """\
107 Преобразование текущего состояния объекта в массив
108 """
109 res = dict(
110 n=self.name,
111 id=self.sess_id
114 if self.level >= 0:
115 res['al'] = self.level
117 if self._access:
118 res['a'] = self.get_access()
120 if self._meta:
121 res['m'] = self.get_meta()
123 return res
125 def to_env(self, request: bottle.BaseRequest = bottle.request):
126 """\
127 Сохранение своего экземпляра в контекст запроса
128 """
129 tools.set_env(self.ENV_NAME, self, request=request)
131 @classmethod
132 def from_dict(cls, d: Dict[str, Any]):
133 """\
134 Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта,
135 подобного сохранённому
136 """
137 params = dict()
139 # User name
140 uname = d.get('n')
141 try:
142 uname = str(uname) if uname is not None else ''
144 except (ValueError, TypeError) as e:
145 raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"')
147 if not uname:
148 raise UIDError(f'Идентификатор пользователя не задан в переданном словаре')
150 params['uname'] = uname
152 # Session Identity
153 sess_id = d.get('id')
154 if sess_id is not None:
155 try:
156 sess_id = str(sess_id)
158 except (TypeError, ValueError) as e:
159 UIDError(f'Не удалось представить идентификатор сессии как строку: '
160 f' sess_id="{sess_id}" '
161 f' err="{e}"')
163 params['sess_id'] = sess_id
165 # Access Level
166 level = d.get('al')
168 if level is not None:
169 if type(level) is not int:
170 try:
171 level = int(level)
173 except (TypeError, ValueError) as e:
174 raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:'
175 f' level="{level}" err="{e}"'
178 params['acc_level'] = level
180 # Access Tags
181 acc_tags = d.get('a')
183 if acc_tags is not None:
184 try:
185 acc_tags = (t for t in iter(acc_tags))
187 except (ValueError, TypeError) as e:
188 raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: '
189 f' acc_tags="{acc_tags}" '
190 f' err="{e}"')
192 params['acc_tags'] = acc_tags
194 # User Metadata
195 u_meta = d.get('m')
197 if u_meta is not None:
198 if type(u_meta) is not dict:
199 raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}')
201 try:
202 u_meta = dict((str(k), str(v)) for k, v in u_meta.items())
204 except (TypeError, ValueError) as e:
205 raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}')
207 params['user_meta'] = u_meta
209 return cls(**params)
211 @classmethod
212 def from_env(cls, request: bottle.BaseRequest = bottle.request):
213 """\
214 Извлечение экземпляра объекта из контекста запроса, если он там сохранён.
215 """
216 uid = tools.get_env(cls.ENV_NAME, request=request)
218 if uid is None:
219 raise IDNotFound()
221 if type(uid) is not cls:
222 raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"')
224 return uid
227 class IDHelper(object):
228 """\
229 Класс поддержки идентификации
230 """
231 def __init__(self,
232 app_name: str,
233 sign_secret: str,
234 authelia_helper: Optional[AutheliaHelper] = None,
235 sess_timeout: int = SESSION_TIMEOUT
236 ):
237 """\
238 :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-``
239 :param authelia_helper: Объект AutheliaHelper с описанием системы аутентификации, если используем Authelia
240 :param sign_secret: Секрет, которым будет подписываться JWT.
241 :param sess_timeout: Время жизни сессии пользователя.
242 """
243 if set(ascii_letters + digits + '-') < set(app_name):
244 _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-')))
245 raise IDError(f'Не допустимые символы в имени приложения: {_buf}')
247 self.app_name = app_name
248 self.cookie_name = f'X-ID-{self.app_name}'
249 self.sess_timeout = sess_timeout
250 self.jwt_helper = jwt.JWTHelper(sign_secret)
251 self.ah = authelia_helper
253 # Свойства необходимые для работы в качестве плагина Bottle
254 self.name = 'IDHelper'
255 self.api = 2
257 def make_cookie(self, uid: Optional[UID] = None,
258 request: bottle.BaseRequest = bottle.request,
259 is_secure: bool = True,
260 is_httponly: bool = True
261 ) -> Cookie:
262 """\
263 Формируем ``Cookie`` для идентификации сессии пользователя.
265 :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса
266 :param request: Экземпляр запроса, на основании которого формируется ответ.
267 :param is_secure: Установить признак ``secure`` на cookie
268 :param is_httponly: Установить признак ``httponly`` на cookie
269 """
270 if uid is None:
271 uid = UID.from_env(request=request)
273 uid_dict = uid.to_dict()
274 uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
276 return Cookie(
277 name=self.cookie_name,
278 value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout),
279 max_age=self.sess_timeout,
280 secure=is_secure,
281 httponly=is_httponly
284 def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response,
285 request: bottle.BaseRequest = bottle.request,
286 is_secure: bool = True,
287 is_httponly: bool = True
288 ):
289 """\
290 Установка cookie на ответ пользователю
292 :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса.
293 :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response``
294 :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request``
295 :param is_secure: Установить признак ``secure`` на cookie
296 :param is_httponly: Установить признак ``httponly`` на cookie
297 """
298 cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly)
299 cookie.response_add(response)
301 def _get_id_impl(self, request: bottle.BaseRequest = bottle.request):
302 """\
303 Реализация получения ID и сохранения его в контекст запроса для последующего применения
304 в других методах класса.
305 """
306 sid = tools.get_cookie(self.cookie_name)
307 if sid is None:
308 if self.ah is None:
309 IDNotFound()
311 else:
312 try:
313 _ahuser = self.ah(request=request)
314 uid = UID(uname=_ahuser.uname, acc_tags=_ahuser.groups, user_meta={
315 'email': _ahuser.email,
316 'name': _ahuser.name,
317 })
319 uid.to_env(request=request)
321 except AHAuthError:
322 raise IDNotFound()
324 else:
325 try:
326 uid_raw = self.jwt_helper.decode(sid, check_timeout=True)
327 uid = UID.from_dict(uid_raw)
329 fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
331 if fp != uid_raw.get('fp'):
332 raise IDCheckError('Проверка подписи сессии не прошла')
334 uid.to_env(request=request)
336 except (jwt.JWTAuthError, UIDError) as e:
337 raise IDCheckError(f'Ошибка проверки ID: {e}')
339 except (jwt.JWTError, IDError) as e:
340 raise IDError(f'Ошибка в обработке ID запроса: {e}')
342 def need_id(self):
343 """\
344 Декоратор для конкретных точек входа приложения
345 """
346 def d(callback):
347 def f(*a, **kwa):
348 self._get_id_impl()
349 return callback(*a, **kwa)
351 return f
353 return d
355 @staticmethod
356 def env_id(request: bottle.BaseRequest = bottle.request) -> UID:
357 """\
358 Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне
359 плагина или декоратора
360 """
361 return UID.from_env(request=request)
363 # Реализация методов для работы в качестве Bottle плагина
364 def setup(self, app: bottle.Bottle):
365 pass
367 def apply(self, callback: Any, route: bottle.Route):
368 def f(*a, **kwa):
369 self._get_id_impl()
371 return callback(*a, **kwa)
373 return f