py.lib
23:1668cc57225b
Go to Latest
py.lib/ldap_utils/ldap.py
. Рефакторинг бессмысленный и беспощадный
3 from ldap3 import Server, Connection, SIMPLE, SUBTREE, IP_V4_PREFERRED
4 from ldap3.core.exceptions import LDAPException, LDAPInvalidCredentialsResult
5 from ldap3.utils.conv import escape_filter_chars
6 from collections import namedtuple
7 from typing import List, Optional, Dict, Union
10 class LdapError(Exception):
14 class LdapServerError(LdapError):
18 class LdapUserError(LdapError):
22 class LdapNotFound(LdapUserError):
23 def __init__(self, filter: str, attribs: List[str]):
24 _attribs = ', '.join(attribs)
25 super().__init__(f'Данные согласно фильтру в LDAP не найдены: {filter}, attrib={_attribs}')
28 class LdapAuthDenied(LdapUserError):
29 def __init__(self, user):
30 super(LdapAuthDenied, self).__init__(f'Авторизация пользователя "{user}" отклонена сервером')
33 LdapUserInfo = namedtuple('LdapUserInfo', [
34 'name', 'mail', 'groups',
37 ACC_STATUS_ACCOUNTDISABLE = 2
38 ACC_STATUS_LOCKOUT = 16
39 ACC_STATUS_PASSWORD_EXPIRED = 8388608
40 ACC_STATUS_NORMAL_ACCOUNT = 512
41 ACC_STATUS_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT \
42 | ACC_STATUS_PASSWORD_EXPIRED | ACC_STATUS_NORMAL_ACCOUNT
44 ACC_STATUS_FAIL_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT | ACC_STATUS_PASSWORD_EXPIRED
47 class LdapRes(object):
48 def __init__(self, dn: str, attribs: Dict[str, Union[str, List[str]]]):
50 self.attribs = attribs
56 class LdapConfig(object):
59 user_name: str, passwd: str,
60 base_dn: Optional[str] = None,
61 timeout: int = 150, reconnects: int = 3,
65 self.server_uri = server_uri # URL Доступа к северу: 'ldap://servername:port/'
66 self.user_name = user_name # Имя пользователя для подключения к серверу LDAP
67 self.passwd = passwd # Пароль для подключения к серверу LDAP
68 self.base_dn = base_dn # База поиска
69 self.timeout = timeout # Выставляемые таймауты на операцию
70 self.reconnects = reconnects # Количество попыток переподключиться
71 self.auth_domain = auth_domain # Имя домена авторизации
74 return (f'{self.user_name}@{self.server_uri}'
75 f' ({self.base_dn} timeout="{self.timeout}" auth_domain="{self.auth_domain}")')
80 def filter_chars(val: str, encoding=None) -> str:
81 return escape_filter_chars(val, encoding)
83 def __init__(self, config: LdapConfig):
84 self.server = Server(config.server_uri, connect_timeout=config.timeout, mode=IP_V4_PREFERRED)
87 def get_connection(self) -> Connection:
90 # Подготавливаем имя пользователя к авторизации в AD
91 if self.config.auth_domain != '':
92 _ldap_auth_uname = f'{self.config.auth_domain}\\{self.config.user_name}'
94 _ldap_auth_uname = self.config.user_name
99 ldap_connection = Connection(self.server, authentication=SIMPLE,
100 user=_ldap_auth_uname, password=self.config.passwd,
102 auto_referrals=False, raise_exceptions=True, auto_range=True,
105 ldap_connection.open()
106 if not ldap_connection.bind():
107 continue # Пытаемся переподключиться при ошибках
111 except LDAPInvalidCredentialsResult:
112 raise LdapAuthDenied(self.config.user_name)
114 except LDAPException as e:
115 if _reconnects > self.config.reconnects:
116 # Возбуждаем исключение после того как попробовали несколько раз
117 raise LdapServerError(f'Ошибка подключения к серверу: {e}')
119 if ldap_connection is None:
120 raise LdapServerError('Не выполнено подключение к серверу')
122 return ldap_connection
124 def search(self, ldap_filter: str,
125 attribs: Optional[List[str]] = None,
126 base_dn: Optional[str] = None):
129 if not self.config.base_dn:
130 raise LdapUserError('Не задан Base DN для поиска в LDAP')
132 base_dn = self.config.base_dn
137 ldap_connection = self.get_connection()
138 ldap_connection.search(base_dn, ldap_filter, attributes=attribs)
139 if ldap_connection.result['result'] != 0:
140 raise LdapServerError(f'Не могу выполнить запрос к серверу LDAP: '
141 f'{ldap_connection.result.get("description")}')
143 for itm in ldap_connection.response:
144 yield LdapRes(dn=itm['dn'], attribs=itm['attributes'])
147 def filter_group_names(group):
148 group_res = list(filter(lambda x: x.lower().startswith('cn'), group.split(',')))
151 return group_res[0].split('=')[1]
153 raise LdapError(f'Имя группы с неожиданным форматом: {group_res[0]}')
155 raise LdapError(f'Нет атрибута CN в имени: {group}')
158 def decode_acc_status(acc_status):
161 if acc_status & ACC_STATUS_ACCOUNTDISABLE != 0:
162 res.append('ACCOUNTDISABLE')
164 if acc_status & ACC_STATUS_NORMAL_ACCOUNT != 0:
165 res.append('NORMAL_ACCOUNT')
167 if acc_status & ACC_STATUS_LOCKOUT != 0:
168 res.append('LOCKOUT')
170 if acc_status & ACC_STATUS_PASSWORD_EXPIRED != 0:
171 res.append('PASSWORD_EXPIRED')