py.lib

Yohn Y. 2022-08-27 Parent:1668cc57225b

42:d9a3784f681b Go to Latest

py.lib/ldap_utils/ldap.py

+ Возможность удаления cookie по полному объекту (полезно, когда выставляются особые параметры на cookie, и их не удаётся удалить по имени)

History
awgur@21 1 # coding: utf-8
awgur@21 2
awgur@21 3 from ldap3 import Server, Connection, SIMPLE, SUBTREE, IP_V4_PREFERRED
awgur@21 4 from ldap3.core.exceptions import LDAPException, LDAPInvalidCredentialsResult
awgur@21 5 from ldap3.utils.conv import escape_filter_chars
awgur@21 6 from collections import namedtuple
awgur@21 7 from typing import List, Optional, Dict, Union
awgur@21 8
awgur@21 9
awgur@21 10 class LdapError(Exception):
awgur@21 11 pass
awgur@21 12
awgur@21 13
awgur@21 14 class LdapServerError(LdapError):
awgur@21 15 pass
awgur@21 16
awgur@21 17
awgur@21 18 class LdapUserError(LdapError):
awgur@21 19 pass
awgur@21 20
awgur@21 21
awgur@21 22 class LdapNotFound(LdapUserError):
awgur@21 23 def __init__(self, filter: str, attribs: List[str]):
awgur@21 24 _attribs = ', '.join(attribs)
awgur@21 25 super().__init__(f'Данные согласно фильтру в LDAP не найдены: {filter}, attrib={_attribs}')
awgur@21 26
awgur@21 27
awgur@21 28 class LdapAuthDenied(LdapUserError):
awgur@21 29 def __init__(self, user):
awgur@21 30 super(LdapAuthDenied, self).__init__(f'Авторизация пользователя "{user}" отклонена сервером')
awgur@21 31
awgur@21 32
awgur@21 33 LdapUserInfo = namedtuple('LdapUserInfo', [
awgur@21 34 'name', 'mail', 'groups',
awgur@21 35 ])
awgur@21 36
awgur@21 37 ACC_STATUS_ACCOUNTDISABLE = 2
awgur@21 38 ACC_STATUS_LOCKOUT = 16
awgur@21 39 ACC_STATUS_PASSWORD_EXPIRED = 8388608
awgur@21 40 ACC_STATUS_NORMAL_ACCOUNT = 512
awgur@21 41 ACC_STATUS_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT \
awgur@21 42 | ACC_STATUS_PASSWORD_EXPIRED | ACC_STATUS_NORMAL_ACCOUNT
awgur@21 43
awgur@21 44 ACC_STATUS_FAIL_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT | ACC_STATUS_PASSWORD_EXPIRED
awgur@21 45
awgur@21 46
awgur@21 47 class LdapRes(object):
awgur@21 48 def __init__(self, dn: str, attribs: Dict[str, Union[str, List[str]]]):
awgur@21 49 self.dn = dn
awgur@21 50 self.attribs = attribs
awgur@21 51
awgur@21 52 def __str__(self):
awgur@21 53 return self.dn
awgur@21 54
awgur@21 55
awgur@21 56 class LdapConfig(object):
awgur@21 57 def __init__(self,
awgur@21 58 server_uri: str,
awgur@21 59 user_name: str, passwd: str,
awgur@21 60 base_dn: Optional[str] = None,
awgur@21 61 timeout: int = 150, reconnects: int = 3,
awgur@21 62 auth_domain: str = ''
awgur@21 63 ):
awgur@21 64
awgur@23 65 self.server_uri = server_uri # URL Доступа к северу: 'ldap://servername:port/'
awgur@23 66 self.user_name = user_name # Имя пользователя для подключения к серверу LDAP
awgur@23 67 self.passwd = passwd # Пароль для подключения к серверу LDAP
awgur@23 68 self.base_dn = base_dn # База поиска
awgur@23 69 self.timeout = timeout # Выставляемые таймауты на операцию
awgur@23 70 self.reconnects = reconnects # Количество попыток переподключиться
awgur@23 71 self.auth_domain = auth_domain # Имя домена авторизации
awgur@21 72
awgur@21 73 def __str__(self):
awgur@23 74 return (f'{self.user_name}@{self.server_uri}'
awgur@23 75 f' ({self.base_dn} timeout="{self.timeout}" auth_domain="{self.auth_domain}")')
awgur@21 76
awgur@21 77
awgur@21 78 class Ldap(object):
awgur@23 79 @staticmethod
awgur@23 80 def filter_chars(val: str, encoding=None) -> str:
awgur@23 81 return escape_filter_chars(val, encoding)
awgur@21 82
awgur@21 83 def __init__(self, config: LdapConfig):
awgur@21 84 self.server = Server(config.server_uri, connect_timeout=config.timeout, mode=IP_V4_PREFERRED)
awgur@21 85 self.config = config
awgur@21 86
awgur@21 87 def get_connection(self) -> Connection:
awgur@21 88 _reconnects = 1
awgur@21 89
awgur@21 90 # Подготавливаем имя пользователя к авторизации в AD
awgur@21 91 if self.config.auth_domain != '':
awgur@21 92 _ldap_auth_uname = f'{self.config.auth_domain}\\{self.config.user_name}'
awgur@21 93 else:
awgur@21 94 _ldap_auth_uname = self.config.user_name
awgur@21 95
awgur@21 96 while True:
awgur@21 97 _reconnects += 1
awgur@21 98 try:
awgur@21 99 ldap_connection = Connection(self.server, authentication=SIMPLE,
awgur@21 100 user=_ldap_auth_uname, password=self.config.passwd,
awgur@21 101 check_names=True,
awgur@21 102 auto_referrals=False, raise_exceptions=True, auto_range=True,
awgur@21 103 )
awgur@21 104
awgur@21 105 ldap_connection.open()
awgur@21 106 if not ldap_connection.bind():
awgur@21 107 continue # Пытаемся переподключиться при ошибках
awgur@21 108
awgur@21 109 break
awgur@21 110
awgur@21 111 except LDAPInvalidCredentialsResult:
awgur@21 112 raise LdapAuthDenied(self.config.user_name)
awgur@21 113
awgur@21 114 except LDAPException as e:
awgur@21 115 if _reconnects > self.config.reconnects:
awgur@21 116 # Возбуждаем исключение после того как попробовали несколько раз
awgur@21 117 raise LdapServerError(f'Ошибка подключения к серверу: {e}')
awgur@21 118
awgur@21 119 if ldap_connection is None:
awgur@21 120 raise LdapServerError('Не выполнено подключение к серверу')
awgur@21 121
awgur@21 122 return ldap_connection
awgur@21 123
awgur@21 124 def search(self, ldap_filter: str,
awgur@21 125 attribs: Optional[List[str]] = None,
awgur@21 126 base_dn: Optional[str] = None):
awgur@21 127
awgur@21 128 if base_dn is None:
awgur@21 129 if not self.config.base_dn:
awgur@21 130 raise LdapUserError('Не задан Base DN для поиска в LDAP')
awgur@21 131
awgur@21 132 base_dn = self.config.base_dn
awgur@21 133
awgur@21 134 if attribs is None:
awgur@21 135 attribs = ['cn']
awgur@21 136
awgur@21 137 ldap_connection = self.get_connection()
awgur@21 138 ldap_connection.search(base_dn, ldap_filter, attributes=attribs)
awgur@21 139 if ldap_connection.result['result'] != 0:
awgur@21 140 raise LdapServerError(f'Не могу выполнить запрос к серверу LDAP: '
awgur@21 141 f'{ldap_connection.result.get("description")}')
awgur@21 142
awgur@21 143 for itm in ldap_connection.response:
awgur@21 144 yield LdapRes(dn=itm['dn'], attribs=itm['attributes'])
awgur@21 145
awgur@21 146 @staticmethod
awgur@21 147 def filter_group_names(group):
awgur@21 148 group_res = list(filter(lambda x: x.lower().startswith('cn'), group.split(',')))
awgur@21 149 if group_res:
awgur@21 150 try:
awgur@21 151 return group_res[0].split('=')[1]
awgur@21 152 except IndexError:
awgur@21 153 raise LdapError(f'Имя группы с неожиданным форматом: {group_res[0]}')
awgur@21 154 else:
awgur@21 155 raise LdapError(f'Нет атрибута CN в имени: {group}')
awgur@21 156
awgur@21 157 @staticmethod
awgur@21 158 def decode_acc_status(acc_status):
awgur@21 159 res = []
awgur@21 160
awgur@21 161 if acc_status & ACC_STATUS_ACCOUNTDISABLE != 0:
awgur@21 162 res.append('ACCOUNTDISABLE')
awgur@21 163
awgur@21 164 if acc_status & ACC_STATUS_NORMAL_ACCOUNT != 0:
awgur@21 165 res.append('NORMAL_ACCOUNT')
awgur@21 166
awgur@21 167 if acc_status & ACC_STATUS_LOCKOUT != 0:
awgur@21 168 res.append('LOCKOUT')
awgur@21 169
awgur@21 170 if acc_status & ACC_STATUS_PASSWORD_EXPIRED != 0:
awgur@21 171 res.append('PASSWORD_EXPIRED')
awgur@21 172
awgur@21 173 return res