py.lib
py.lib/webapp/jwt_util.py
* С разбегу влетел в type hinting, пришлось обкладывать костылями. И этот процесс судя по всему длительный, и место больное будет долго...
| awgur@31 | 1 # coding: utf-8 |
| awgur@31 | 2 |
| awgur@31 | 3 import jwt |
| awgur@31 | 4 from datetime import datetime, timedelta |
| awgur@31 | 5 from typing import Optional |
| awgur@31 | 6 |
| awgur@31 | 7 |
| awgur@31 | 8 JWT_HASH_ALGO = 'HS512' |
| awgur@31 | 9 |
| awgur@31 | 10 |
| awgur@31 | 11 class JWTError(Exception): |
| awgur@31 | 12 pass |
| awgur@31 | 13 |
| awgur@31 | 14 |
| awgur@31 | 15 class JWTAuthError(JWTError): |
| awgur@31 | 16 """\ |
| awgur@31 | 17 Провалена проверка токена на допустимость по подписи времени или прочее |
| awgur@31 | 18 """ |
| awgur@31 | 19 |
| awgur@31 | 20 |
| awgur@31 | 21 class JWTHelper(object): |
| awgur@31 | 22 def __init__(self, key: str): |
| awgur@31 | 23 self.key = key |
| awgur@31 | 24 |
| awgur@31 | 25 def encode(self, data: dict, timeout: Optional[int] = None) -> str: |
| awgur@31 | 26 if timeout is not None: |
| awgur@31 | 27 data['exp'] = datetime.utcnow() + timedelta(seconds=timeout) |
| awgur@31 | 28 |
| awgur@31 | 29 return jwt.encode(data, key=self.key, algorithm=JWT_HASH_ALGO) |
| awgur@31 | 30 |
| awgur@31 | 31 def decode(self, token: str, check_timeout: bool = False) -> dict: |
| awgur@31 | 32 opts = { |
| awgur@31 | 33 'algorithms': [JWT_HASH_ALGO, ] |
| awgur@31 | 34 } |
| awgur@31 | 35 |
| awgur@31 | 36 if check_timeout: |
| awgur@31 | 37 opts['options'] = {'require': {'exp'}} |
| awgur@31 | 38 |
| awgur@31 | 39 if token is None: |
| awgur@31 | 40 raise JWTAuthError('Ключ отсутствует') |
| awgur@31 | 41 else: |
| awgur@31 | 42 token = token.encode('utf-8') |
| awgur@31 | 43 |
| awgur@31 | 44 try: |
| awgur@31 | 45 return jwt.decode(jwt=token, key=self.key, **opts) |
| awgur@31 | 46 |
| awgur@31 | 47 except (jwt.InvalidIssuerError, jwt.InvalidSignatureError, jwt.ExpiredSignatureError) as e: |
| awgur@31 | 48 raise JWTAuthError(str(e)) |
| awgur@31 | 49 |
| awgur@31 | 50 except jwt.PyJWTError as e: |
| awgur@31 | 51 raise JWTError(f'{type(e).__name__}: {e}') |
| awgur@31 | 52 |
| awgur@31 | 53 @classmethod |
| awgur@31 | 54 def make_cls_fabric(cls, key: str): |
| awgur@31 | 55 def f(): |
| awgur@31 | 56 return cls(key=key) |
| awgur@31 | 57 |
| awgur@31 | 58 return f |