py.lib
21:ad6778cf8cf5
Go to Latest
py.lib/ldap_utils/ldap.py
+ Работа с LDAP
3 from ldap3 import Server, Connection, SIMPLE, SUBTREE, IP_V4_PREFERRED
4 from ldap3.core.exceptions import LDAPException, LDAPInvalidCredentialsResult
5 from ldap3.utils.conv import escape_filter_chars
6 from collections import namedtuple
7 from typing import List, Optional, Dict, Union
10 class LdapError(Exception):
14 class LdapServerError(LdapError):
18 class LdapUserError(LdapError):
22 class LdapNotFound(LdapUserError):
23 def __init__(self, filter: str, attribs: List[str]):
24 _attribs = ', '.join(attribs)
25 super().__init__(f'Данные согласно фильтру в LDAP не найдены: {filter}, attrib={_attribs}')
28 class LdapAuthDenied(LdapUserError):
29 def __init__(self, user):
30 super(LdapAuthDenied, self).__init__(f'Авторизация пользователя "{user}" отклонена сервером')
33 LdapUserInfo = namedtuple('LdapUserInfo', [
34 'name', 'mail', 'groups',
37 ACC_STATUS_ACCOUNTDISABLE = 2
38 ACC_STATUS_LOCKOUT = 16
39 ACC_STATUS_PASSWORD_EXPIRED = 8388608
40 ACC_STATUS_NORMAL_ACCOUNT = 512
41 ACC_STATUS_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT \
42 | ACC_STATUS_PASSWORD_EXPIRED | ACC_STATUS_NORMAL_ACCOUNT
44 ACC_STATUS_FAIL_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT | ACC_STATUS_PASSWORD_EXPIRED
47 class LdapRes(object):
48 def __init__(self, dn: str, attribs: Dict[str, Union[str, List[str]]]):
50 self.attribs = attribs
56 class LdapConfig(object):
59 user_name: str, passwd: str,
60 base_dn: Optional[str] = None,
61 timeout: int = 150, reconnects: int = 3,
65 self.server_uri = server_uri # URL Доступа к северу: 'ldap://servername:port/'
66 self.user_name = user_name # Имя пользователя для подключения к серверу LDAP
67 self.passwd = passwd # Пароль для подключения к серверу LDAP
68 self.base_dn = base_dn # База поиска
69 self.timeout = timeout # Выставляемые таймауты на операцию
70 self.reconnects = reconnects # Количество попыток переподключиться
71 self.auth_domain = auth_domain # Имя домена авторизации
74 return f'{self.user_name}@{self.server_uri} ({self.base_dn} timeout="{self.timeout}" auth_domain="{self.auth_domain}")'
78 escape_filter_chars = escape_filter_chars
80 def __init__(self, config: LdapConfig):
81 self.server = Server(config.server_uri, connect_timeout=config.timeout, mode=IP_V4_PREFERRED)
84 def get_connection(self) -> Connection:
87 # Подготавливаем имя пользователя к авторизации в AD
88 if self.config.auth_domain != '':
89 _ldap_auth_uname = f'{self.config.auth_domain}\\{self.config.user_name}'
91 _ldap_auth_uname = self.config.user_name
96 ldap_connection = Connection(self.server, authentication=SIMPLE,
97 user=_ldap_auth_uname, password=self.config.passwd,
99 auto_referrals=False, raise_exceptions=True, auto_range=True,
102 ldap_connection.open()
103 if not ldap_connection.bind():
104 continue # Пытаемся переподключиться при ошибках
108 except LDAPInvalidCredentialsResult:
109 raise LdapAuthDenied(self.config.user_name)
111 except LDAPException as e:
112 if _reconnects > self.config.reconnects:
113 # Возбуждаем исключение после того как попробовали несколько раз
114 raise LdapServerError(f'Ошибка подключения к серверу: {e}')
116 if ldap_connection is None:
117 raise LdapServerError('Не выполнено подключение к серверу')
119 return ldap_connection
121 def search(self, ldap_filter: str,
122 attribs: Optional[List[str]] = None,
123 base_dn: Optional[str] = None):
126 if not self.config.base_dn:
127 raise LdapUserError('Не задан Base DN для поиска в LDAP')
129 base_dn = self.config.base_dn
134 ldap_connection = self.get_connection()
135 ldap_connection.search(base_dn, ldap_filter, attributes=attribs)
136 if ldap_connection.result['result'] != 0:
137 raise LdapServerError(f'Не могу выполнить запрос к серверу LDAP: '
138 f'{ldap_connection.result.get("description")}')
140 for itm in ldap_connection.response:
141 yield LdapRes(dn=itm['dn'], attribs=itm['attributes'])
144 def filter_group_names(group):
145 group_res = list(filter(lambda x: x.lower().startswith('cn'), group.split(',')))
148 return group_res[0].split('=')[1]
150 raise LdapError(f'Имя группы с неожиданным форматом: {group_res[0]}')
152 raise LdapError(f'Нет атрибута CN в имени: {group}')
156 def decode_acc_status(acc_status):
159 if acc_status & ACC_STATUS_ACCOUNTDISABLE != 0:
160 res.append('ACCOUNTDISABLE')
162 if acc_status & ACC_STATUS_NORMAL_ACCOUNT != 0:
163 res.append('NORMAL_ACCOUNT')
165 if acc_status & ACC_STATUS_LOCKOUT != 0:
166 res.append('LOCKOUT')
168 if acc_status & ACC_STATUS_PASSWORD_EXPIRED != 0:
169 res.append('PASSWORD_EXPIRED')