py.lib.aw_web_tools

Yohn Y. 2024-02-27 Child:b9fd029be707

5:4d3b509e0967 Go to Latest

py.lib.aw_web_tools/src/aw_web_tools/id_helper.py

+ Реализация плагина идентификации . Сводим все классы исключений к одному предку для удобства

History
1 # coding: utf-8
2 """\
3 Модуль вспомогательных средств обеспечения идентификации пользователей
4 """
5 import bottle
6 from string import ascii_letters, digits
7 from typing import Optional, Dict, Iterable, Any
8 from . import btle_tools as tools
9 from . import Error
10 from . import jwt
11 from .cookie import Cookie
14 SESSION_TIMEOUT = 86400 # Продолжительность сессии по умолчанию.
17 class IDError(Error):
18 """\
19 Общий класс ошибок идентификации
20 """
23 class IDCheckError(IDError):
24 """\
25 Ошибка проверки ID
26 """
29 class IDNotFound(IDCheckError):
30 """\
31 Запрос не идентифицирован
32 """
33 def __init__(self):
34 super().__init__('Запрос не идентифицирован')
37 class UIDError(IDError):
38 """\
39 Ошибки при работе с классом пользовательского идентификатора
40 """
43 class UID(object):
44 """\
45 Класс пользовательского идентификатора.
47 Обеспечивает хранение идентификационной информации о пользователе.
48 """
49 ENV_NAME = 'AW_UID' # Идентификатор для хранения объекта в контексте запроса.
51 def __init__(self, uname: str, acc_level: int = -1,
52 sess_id: Optional[str] = None,
53 acc_tags: Optional[Iterable[str]] = None,
54 user_meta: Optional[Dict[str, str]] = None
55 ):
56 """\
57 :param uname: Имя пользователя внутри системы
58 :param acc_level: Уровень допуска пользователя. Интерпретация целиком на стороне приложения.
59 :param sess_id: Идентификатор сессии пользователя
60 :param acc_tags: Строковые константы, определяющие конкретное множество доступных пользователю ресурсов.
61 Интерпретация целиком на стороне приложения
62 :param user_meta: Иная полезная приложению информация, сохранённая в виде тег-значение
63 """
64 self.name = uname
65 self.level = acc_level
66 self.sess_id = sess_id
67 self._access = acc_tags
68 self._meta = user_meta
70 def get_fp_id(self) -> str:
71 """\
72 Преобразует объект в форму, пригодную для получения подписи сессии
73 """
74 buf = ':'.join(sorted(map(lambda x: str(x).lower().strip(), self.get_access())))
75 if self.level >= 0:
76 buf += f':{self.level}'
78 return f'{self.name}:{buf}'
80 def __str__(self):
81 return self.name
83 def get_access(self):
84 """\
85 Получить теги доступа пользователя, если они имелись
86 """
87 if self._access:
88 return [i for i in self._access]
90 else:
91 return []
93 def get_meta(self):
94 """\
95 Получить метаданные пользователя, если они имелись.
96 """
97 if self._meta:
98 return dict([(k, v) for k, v in self._meta.items()])
100 else:
101 return dict()
103 def to_dict(self) -> Dict[str, str]:
104 """\
105 Преобразование текущего состояния объекта в массив
106 """
107 res = dict(
108 n=self.name,
109 id=self.sess_id
112 if self.level >= 0:
113 res['al'] = self.level
115 if self._access:
116 res['a'] = self.get_access()
118 if self._meta:
119 res['m'] = self.get_meta()
121 return res
123 def to_env(self, request: bottle.BaseRequest = bottle.request):
124 """\
125 Сохранение своего экземпляра в контекст запроса
126 """
127 tools.set_env(self.ENV_NAME, self, request=request)
129 @classmethod
130 def from_dict(cls, d: Dict[str, Any]):
131 """\
132 Разбор словаря, созданного методом ``to_dict`` и восстановление из него экземпляра объекта,
133 подобного сохранённому
134 """
135 params = dict()
137 # User name
138 uname = d.get('n')
139 try:
140 uname = str(uname) if uname is not None else ''
142 except (ValueError, TypeError) as e:
143 raise UIDError(f'Не удалось представить имя пользователя как строку: name="{uname}" err="{e}"')
145 if not uname:
146 raise UIDError(f'Идентификатор пользователя не задан в переданном словаре')
148 params['uname'] = uname
150 # Session Identity
151 sess_id = d.get('id')
152 if sess_id is not None:
153 try:
154 sess_id = str(sess_id)
156 except (TypeError, ValueError) as e:
157 UIDError(f'Не удалось представить идентификатор сессии как строку: '
158 f' sess_id="{sess_id}" '
159 f' err="{e}"')
161 params['sess_id'] = sess_id
163 # Access Level
164 level = d.get('al')
166 if level is not None:
167 if type(level) is not int:
168 try:
169 level = int(level)
171 except (TypeError, ValueError) as e:
172 raise UIDError(f'Не получилось представить уровень допуска пользователя в качестве числа:'
173 f' level="{level}" err="{e}"'
176 params['acc_level'] = level
178 # Access Tags
179 acc_tags = d.get('a')
181 if acc_tags is not None:
182 try:
183 acc_tags = (t for t in iter(acc_tags))
185 except (ValueError, TypeError) as e:
186 raise UIDError(f'Не вышло преобразовать теги доступа к строковому значению: '
187 f' acc_tags="{acc_tags}" '
188 f' err="{e}"')
190 params['acc_tags'] = acc_tags
192 # User Metadata
193 u_meta = d.get('m')
195 if u_meta is not None:
196 if type(u_meta) is not dict:
197 raise UIDError(f'Пользовательские метаданные не имеют нужного типа: {type(u_meta).__name__}')
199 try:
200 u_meta = dict((str(k), str(v)) for k, v in u_meta.items())
202 except (TypeError, ValueError) as e:
203 raise UIDError(f'Не удалось преобразовать в словарь пользовательские метаданные: {e}')
205 params['user_meta'] = u_meta
207 return cls(**params)
209 @classmethod
210 def from_env(cls, request: bottle.BaseRequest = bottle.request):
211 """\
212 Извлечение экземпляра объекта из контекста запроса, если он там сохранён.
213 """
214 uid = tools.get_env(cls.ENV_NAME, request=request)
216 if uid is None:
217 raise IDNotFound()
219 if type(uid) is not cls:
220 raise IDCheckError(f'Неожиданный тип ID: type="{type(uid).__name__}" val="{uid}"')
222 return uid
225 class IDHelper(object):
226 """\
227 Класс поддержки идентификации
228 """
229 def __init__(self,
230 app_name: str,
231 sign_secret: str,
232 sess_timeout: int = SESSION_TIMEOUT
233 ):
234 """\
235 :param app_name: Имя приложения. Допускаются цифры символы латиницы и знак ``-``
236 :param sign_secret: Секрет, которым будет подписываться JWT.
237 :param sess_timeout: Время жизни сессии пользователя.
238 """
239 if set(ascii_letters + digits + '-') < set(app_name):
240 _buf = ', '.join(map(lambda x: f'"{x}"', set(app_name) - set(ascii_letters + digits + '-')))
241 raise IDError(f'Не допустимые символы в имени приложения: {_buf}')
243 self.app_name = app_name
244 self.cookie_name = f'X-ID-{self.app_name}'
245 self.sess_timeout = sess_timeout
246 self.jwt_helper = jwt.JWTHelper(sign_secret)
248 # Свойства необходимые для работы в качестве плагина Bottle
249 self.name = 'IDHelper'
250 self.api = 2
252 def make_cookie(self, uid: Optional[UID] = None,
253 request: bottle.BaseRequest = bottle.request,
254 is_secure: bool = True,
255 is_httponly: bool = True
256 ) -> Cookie:
257 """\
258 Формируем ``Cookie`` для идентификации сессии пользователя.
260 :param uid: Экземпляр ``UID``, идентифицирующий сессию. Если нет получаем из контекста запроса
261 :param request: Экземпляр запроса, на основании которого формируется ответ.
262 :param is_secure: Установить признак ``secure`` на cookie
263 :param is_httponly: Установить признак ``httponly`` на cookie
264 """
265 if uid is None:
266 uid = UID.from_env(request=request)
268 uid_dict = uid.to_dict()
269 uid_dict['fp'] = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
271 return Cookie(
272 name=self.cookie_name,
273 value=self.jwt_helper.encode(uid.to_dict(), timeout=self.sess_timeout),
274 max_age=self.sess_timeout,
275 secure=is_secure,
276 httponly=is_httponly
279 def set_id(self, uid: Optional[UID] = None, response: bottle.BaseResponse = bottle.response,
280 request: bottle.BaseRequest = bottle.request,
281 is_secure: bool = True,
282 is_httponly: bool = True
283 ):
284 """\
285 Установка cookie на ответ пользователю
287 :param uid: Объект ``UID``, которым идентифицируется сессия. По умолчанию берётся из контекста запроса.
288 :param response: Объект ответа, на который устанавливаем cookie. По умолчанию ``bottle.response``
289 :param request: Экземпляр запроса, на основании которого формируется ответ. По умолчанию ``bottle.request``
290 :param is_secure: Установить признак ``secure`` на cookie
291 :param is_httponly: Установить признак ``httponly`` на cookie
292 """
293 cookie = self.make_cookie(uid=uid, request=request, is_secure=is_secure, is_httponly=is_httponly)
294 cookie.response_add(response)
296 def _get_id_impl(self, request: bottle.BaseRequest = bottle.request):
297 """\
298 Реализация получения ID и сохранения его в контекст запроса для последующего применения
299 в других методах класса.
300 """
301 sid = tools.get_cookie(self.cookie_name)
302 if sid is None:
303 raise IDNotFound()
305 try:
306 uid_raw = self.jwt_helper.decode(sid, check_timeout=True)
307 uid = UID.from_dict(uid_raw)
309 fp = tools.get_session_fingerprint(uid.get_fp_id(), request=request)
311 if fp != uid_raw.get('fp'):
312 raise IDCheckError('Проверка подписи сессии не прошла')
314 uid.to_env(request=request)
316 except (jwt.JWTAuthError, UIDError) as e:
317 raise IDCheckError(f'Ошибка проверки ID: {e}')
319 except (jwt.JWTError, IDError) as e:
320 raise IDError(f'Ошибка в обработке ID запроса: {e}')
322 def need_id(self):
323 """\
324 Декоратор для конкретных точек входа приложения
325 """
326 def d(callback):
327 def f(*a, **kwa):
328 self._get_id_impl()
329 return callback(*a, **kwa)
331 return f
333 return d
335 @staticmethod
336 def env_id(request: bottle.BaseRequest = bottle.request) -> UID:
337 """\
338 Получаем ID из контекста запроса (для случая, когда проверка и валидация ID проведена на уровне
339 плагина или декоратора
340 """
341 return UID.from_env(request=request)
343 # Реализация методов для работы в качестве Bottle плагина
344 def setup(self, app: bottle.Bottle):
345 pass
347 def apply(self, callback: Any, route: bottle.Route):
348 def f(*a, **kwa):
349 self._get_id_impl()
351 return callback(*a, **kwa)
353 return f