py.lib

Yohn Y. 2021-10-23 Child:1668cc57225b

21:ad6778cf8cf5 Go to Latest

py.lib/ldap_utils/ldap.py

+ Работа с LDAP

History
1 # coding: utf-8
3 from ldap3 import Server, Connection, SIMPLE, SUBTREE, IP_V4_PREFERRED
4 from ldap3.core.exceptions import LDAPException, LDAPInvalidCredentialsResult
5 from ldap3.utils.conv import escape_filter_chars
6 from collections import namedtuple
7 from typing import List, Optional, Dict, Union
10 class LdapError(Exception):
11 pass
14 class LdapServerError(LdapError):
15 pass
18 class LdapUserError(LdapError):
19 pass
22 class LdapNotFound(LdapUserError):
23 def __init__(self, filter: str, attribs: List[str]):
24 _attribs = ', '.join(attribs)
25 super().__init__(f'Данные согласно фильтру в LDAP не найдены: {filter}, attrib={_attribs}')
28 class LdapAuthDenied(LdapUserError):
29 def __init__(self, user):
30 super(LdapAuthDenied, self).__init__(f'Авторизация пользователя "{user}" отклонена сервером')
33 LdapUserInfo = namedtuple('LdapUserInfo', [
34 'name', 'mail', 'groups',
35 ])
37 ACC_STATUS_ACCOUNTDISABLE = 2
38 ACC_STATUS_LOCKOUT = 16
39 ACC_STATUS_PASSWORD_EXPIRED = 8388608
40 ACC_STATUS_NORMAL_ACCOUNT = 512
41 ACC_STATUS_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT \
42 | ACC_STATUS_PASSWORD_EXPIRED | ACC_STATUS_NORMAL_ACCOUNT
44 ACC_STATUS_FAIL_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT | ACC_STATUS_PASSWORD_EXPIRED
47 class LdapRes(object):
48 def __init__(self, dn: str, attribs: Dict[str, Union[str, List[str]]]):
49 self.dn = dn
50 self.attribs = attribs
52 def __str__(self):
53 return self.dn
56 class LdapConfig(object):
57 def __init__(self,
58 server_uri: str,
59 user_name: str, passwd: str,
60 base_dn: Optional[str] = None,
61 timeout: int = 150, reconnects: int = 3,
62 auth_domain: str = ''
63 ):
65 self.server_uri = server_uri # URL Доступа к северу: 'ldap://servername:port/'
66 self.user_name = user_name # Имя пользователя для подключения к серверу LDAP
67 self.passwd = passwd # Пароль для подключения к серверу LDAP
68 self.base_dn = base_dn # База поиска
69 self.timeout = timeout # Выставляемые таймауты на операцию
70 self.reconnects = reconnects # Количество попыток переподключиться
71 self.auth_domain = auth_domain # Имя домена авторизации
73 def __str__(self):
74 return f'{self.user_name}@{self.server_uri} ({self.base_dn} timeout="{self.timeout}" auth_domain="{self.auth_domain}")'
77 class Ldap(object):
78 escape_filter_chars = escape_filter_chars
80 def __init__(self, config: LdapConfig):
81 self.server = Server(config.server_uri, connect_timeout=config.timeout, mode=IP_V4_PREFERRED)
82 self.config = config
84 def get_connection(self) -> Connection:
85 _reconnects = 1
87 # Подготавливаем имя пользователя к авторизации в AD
88 if self.config.auth_domain != '':
89 _ldap_auth_uname = f'{self.config.auth_domain}\\{self.config.user_name}'
90 else:
91 _ldap_auth_uname = self.config.user_name
93 while True:
94 _reconnects += 1
95 try:
96 ldap_connection = Connection(self.server, authentication=SIMPLE,
97 user=_ldap_auth_uname, password=self.config.passwd,
98 check_names=True,
99 auto_referrals=False, raise_exceptions=True, auto_range=True,
102 ldap_connection.open()
103 if not ldap_connection.bind():
104 continue # Пытаемся переподключиться при ошибках
106 break
108 except LDAPInvalidCredentialsResult:
109 raise LdapAuthDenied(self.config.user_name)
111 except LDAPException as e:
112 if _reconnects > self.config.reconnects:
113 # Возбуждаем исключение после того как попробовали несколько раз
114 raise LdapServerError(f'Ошибка подключения к серверу: {e}')
116 if ldap_connection is None:
117 raise LdapServerError('Не выполнено подключение к серверу')
119 return ldap_connection
121 def search(self, ldap_filter: str,
122 attribs: Optional[List[str]] = None,
123 base_dn: Optional[str] = None):
125 if base_dn is None:
126 if not self.config.base_dn:
127 raise LdapUserError('Не задан Base DN для поиска в LDAP')
129 base_dn = self.config.base_dn
131 if attribs is None:
132 attribs = ['cn']
134 ldap_connection = self.get_connection()
135 ldap_connection.search(base_dn, ldap_filter, attributes=attribs)
136 if ldap_connection.result['result'] != 0:
137 raise LdapServerError(f'Не могу выполнить запрос к серверу LDAP: '
138 f'{ldap_connection.result.get("description")}')
140 for itm in ldap_connection.response:
141 yield LdapRes(dn=itm['dn'], attribs=itm['attributes'])
143 @staticmethod
144 def filter_group_names(group):
145 group_res = list(filter(lambda x: x.lower().startswith('cn'), group.split(',')))
146 if group_res:
147 try:
148 return group_res[0].split('=')[1]
149 except IndexError:
150 raise LdapError(f'Имя группы с неожиданным форматом: {group_res[0]}')
151 else:
152 raise LdapError(f'Нет атрибута CN в имени: {group}')
155 @staticmethod
156 def decode_acc_status(acc_status):
157 res = []
159 if acc_status & ACC_STATUS_ACCOUNTDISABLE != 0:
160 res.append('ACCOUNTDISABLE')
162 if acc_status & ACC_STATUS_NORMAL_ACCOUNT != 0:
163 res.append('NORMAL_ACCOUNT')
165 if acc_status & ACC_STATUS_LOCKOUT != 0:
166 res.append('LOCKOUT')
168 if acc_status & ACC_STATUS_PASSWORD_EXPIRED != 0:
169 res.append('PASSWORD_EXPIRED')
171 return res