py.lib

Yohn Y. 2021-10-23 Child:1668cc57225b

21:ad6778cf8cf5 Go to Latest

py.lib/ldap_utils/ldap.py

+ Работа с LDAP

History
awgur@21 1 # coding: utf-8
awgur@21 2
awgur@21 3 from ldap3 import Server, Connection, SIMPLE, SUBTREE, IP_V4_PREFERRED
awgur@21 4 from ldap3.core.exceptions import LDAPException, LDAPInvalidCredentialsResult
awgur@21 5 from ldap3.utils.conv import escape_filter_chars
awgur@21 6 from collections import namedtuple
awgur@21 7 from typing import List, Optional, Dict, Union
awgur@21 8
awgur@21 9
awgur@21 10 class LdapError(Exception):
awgur@21 11 pass
awgur@21 12
awgur@21 13
awgur@21 14 class LdapServerError(LdapError):
awgur@21 15 pass
awgur@21 16
awgur@21 17
awgur@21 18 class LdapUserError(LdapError):
awgur@21 19 pass
awgur@21 20
awgur@21 21
awgur@21 22 class LdapNotFound(LdapUserError):
awgur@21 23 def __init__(self, filter: str, attribs: List[str]):
awgur@21 24 _attribs = ', '.join(attribs)
awgur@21 25 super().__init__(f'Данные согласно фильтру в LDAP не найдены: {filter}, attrib={_attribs}')
awgur@21 26
awgur@21 27
awgur@21 28 class LdapAuthDenied(LdapUserError):
awgur@21 29 def __init__(self, user):
awgur@21 30 super(LdapAuthDenied, self).__init__(f'Авторизация пользователя "{user}" отклонена сервером')
awgur@21 31
awgur@21 32
awgur@21 33 LdapUserInfo = namedtuple('LdapUserInfo', [
awgur@21 34 'name', 'mail', 'groups',
awgur@21 35 ])
awgur@21 36
awgur@21 37 ACC_STATUS_ACCOUNTDISABLE = 2
awgur@21 38 ACC_STATUS_LOCKOUT = 16
awgur@21 39 ACC_STATUS_PASSWORD_EXPIRED = 8388608
awgur@21 40 ACC_STATUS_NORMAL_ACCOUNT = 512
awgur@21 41 ACC_STATUS_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT \
awgur@21 42 | ACC_STATUS_PASSWORD_EXPIRED | ACC_STATUS_NORMAL_ACCOUNT
awgur@21 43
awgur@21 44 ACC_STATUS_FAIL_MASK = ACC_STATUS_ACCOUNTDISABLE | ACC_STATUS_LOCKOUT | ACC_STATUS_PASSWORD_EXPIRED
awgur@21 45
awgur@21 46
awgur@21 47 class LdapRes(object):
awgur@21 48 def __init__(self, dn: str, attribs: Dict[str, Union[str, List[str]]]):
awgur@21 49 self.dn = dn
awgur@21 50 self.attribs = attribs
awgur@21 51
awgur@21 52 def __str__(self):
awgur@21 53 return self.dn
awgur@21 54
awgur@21 55
awgur@21 56 class LdapConfig(object):
awgur@21 57 def __init__(self,
awgur@21 58 server_uri: str,
awgur@21 59 user_name: str, passwd: str,
awgur@21 60 base_dn: Optional[str] = None,
awgur@21 61 timeout: int = 150, reconnects: int = 3,
awgur@21 62 auth_domain: str = ''
awgur@21 63 ):
awgur@21 64
awgur@21 65 self.server_uri = server_uri # URL Доступа к северу: 'ldap://servername:port/'
awgur@21 66 self.user_name = user_name # Имя пользователя для подключения к серверу LDAP
awgur@21 67 self.passwd = passwd # Пароль для подключения к серверу LDAP
awgur@21 68 self.base_dn = base_dn # База поиска
awgur@21 69 self.timeout = timeout # Выставляемые таймауты на операцию
awgur@21 70 self.reconnects = reconnects # Количество попыток переподключиться
awgur@21 71 self.auth_domain = auth_domain # Имя домена авторизации
awgur@21 72
awgur@21 73 def __str__(self):
awgur@21 74 return f'{self.user_name}@{self.server_uri} ({self.base_dn} timeout="{self.timeout}" auth_domain="{self.auth_domain}")'
awgur@21 75
awgur@21 76
awgur@21 77 class Ldap(object):
awgur@21 78 escape_filter_chars = escape_filter_chars
awgur@21 79
awgur@21 80 def __init__(self, config: LdapConfig):
awgur@21 81 self.server = Server(config.server_uri, connect_timeout=config.timeout, mode=IP_V4_PREFERRED)
awgur@21 82 self.config = config
awgur@21 83
awgur@21 84 def get_connection(self) -> Connection:
awgur@21 85 _reconnects = 1
awgur@21 86
awgur@21 87 # Подготавливаем имя пользователя к авторизации в AD
awgur@21 88 if self.config.auth_domain != '':
awgur@21 89 _ldap_auth_uname = f'{self.config.auth_domain}\\{self.config.user_name}'
awgur@21 90 else:
awgur@21 91 _ldap_auth_uname = self.config.user_name
awgur@21 92
awgur@21 93 while True:
awgur@21 94 _reconnects += 1
awgur@21 95 try:
awgur@21 96 ldap_connection = Connection(self.server, authentication=SIMPLE,
awgur@21 97 user=_ldap_auth_uname, password=self.config.passwd,
awgur@21 98 check_names=True,
awgur@21 99 auto_referrals=False, raise_exceptions=True, auto_range=True,
awgur@21 100 )
awgur@21 101
awgur@21 102 ldap_connection.open()
awgur@21 103 if not ldap_connection.bind():
awgur@21 104 continue # Пытаемся переподключиться при ошибках
awgur@21 105
awgur@21 106 break
awgur@21 107
awgur@21 108 except LDAPInvalidCredentialsResult:
awgur@21 109 raise LdapAuthDenied(self.config.user_name)
awgur@21 110
awgur@21 111 except LDAPException as e:
awgur@21 112 if _reconnects > self.config.reconnects:
awgur@21 113 # Возбуждаем исключение после того как попробовали несколько раз
awgur@21 114 raise LdapServerError(f'Ошибка подключения к серверу: {e}')
awgur@21 115
awgur@21 116 if ldap_connection is None:
awgur@21 117 raise LdapServerError('Не выполнено подключение к серверу')
awgur@21 118
awgur@21 119 return ldap_connection
awgur@21 120
awgur@21 121 def search(self, ldap_filter: str,
awgur@21 122 attribs: Optional[List[str]] = None,
awgur@21 123 base_dn: Optional[str] = None):
awgur@21 124
awgur@21 125 if base_dn is None:
awgur@21 126 if not self.config.base_dn:
awgur@21 127 raise LdapUserError('Не задан Base DN для поиска в LDAP')
awgur@21 128
awgur@21 129 base_dn = self.config.base_dn
awgur@21 130
awgur@21 131 if attribs is None:
awgur@21 132 attribs = ['cn']
awgur@21 133
awgur@21 134 ldap_connection = self.get_connection()
awgur@21 135 ldap_connection.search(base_dn, ldap_filter, attributes=attribs)
awgur@21 136 if ldap_connection.result['result'] != 0:
awgur@21 137 raise LdapServerError(f'Не могу выполнить запрос к серверу LDAP: '
awgur@21 138 f'{ldap_connection.result.get("description")}')
awgur@21 139
awgur@21 140 for itm in ldap_connection.response:
awgur@21 141 yield LdapRes(dn=itm['dn'], attribs=itm['attributes'])
awgur@21 142
awgur@21 143 @staticmethod
awgur@21 144 def filter_group_names(group):
awgur@21 145 group_res = list(filter(lambda x: x.lower().startswith('cn'), group.split(',')))
awgur@21 146 if group_res:
awgur@21 147 try:
awgur@21 148 return group_res[0].split('=')[1]
awgur@21 149 except IndexError:
awgur@21 150 raise LdapError(f'Имя группы с неожиданным форматом: {group_res[0]}')
awgur@21 151 else:
awgur@21 152 raise LdapError(f'Нет атрибута CN в имени: {group}')
awgur@21 153
awgur@21 154
awgur@21 155 @staticmethod
awgur@21 156 def decode_acc_status(acc_status):
awgur@21 157 res = []
awgur@21 158
awgur@21 159 if acc_status & ACC_STATUS_ACCOUNTDISABLE != 0:
awgur@21 160 res.append('ACCOUNTDISABLE')
awgur@21 161
awgur@21 162 if acc_status & ACC_STATUS_NORMAL_ACCOUNT != 0:
awgur@21 163 res.append('NORMAL_ACCOUNT')
awgur@21 164
awgur@21 165 if acc_status & ACC_STATUS_LOCKOUT != 0:
awgur@21 166 res.append('LOCKOUT')
awgur@21 167
awgur@21 168 if acc_status & ACC_STATUS_PASSWORD_EXPIRED != 0:
awgur@21 169 res.append('PASSWORD_EXPIRED')
awgur@21 170
awgur@21 171 return res