py.lib
2022-02-23
Parent:cab7fedf8432
py.lib/ldap_utils/ldap_util.py
. Рефакторинг бессмысленный и беспощадный
1.1 --- a/ldap_utils/ldap_util.py Sat Nov 27 12:29:59 2021 +0300 1.2 +++ b/ldap_utils/ldap_util.py Wed Feb 23 19:27:33 2022 +0300 1.3 @@ -2,85 +2,98 @@ 1.4 import ldap 1.5 1.6 # =========================================================== 1.7 -# Статус завершения оперций 1.8 +# Статус завершения операций 1.9 STATUS_OK = 0 # Всё ОК. 1.10 STATUS_BADAUTH = 1 # Не верные пользователь или пароль. 1.11 STATUS_SERVERDOWN = 2 # Сервер не доступен. 1.12 STATUS_SERVERERROR = 3 # Ошибка при взаимодействии с сервером. 1.13 1.14 -class LdapError(Exception): pass 1.15 + 1.16 +class LdapError(Exception): 1.17 + pass 1.18 + 1.19 1.20 class LdapAuth(object): 1.21 - def __init__(self, server, userPrefix = 'CORP\\', baseDN = 'DC=example,DC=net', attr = []): 1.22 - self.server = server 1.23 - self.prefix = userPrefix 1.24 - self.status = '' 1.25 - self.baseDN = baseDN 1.26 - self.statusCode = 0 1.27 - self.groups = None 1.28 - self.attr = None 1.29 - self._needAttr = [ i for i in map(str, attr) ] 1.30 - 1.31 - def __call__(self, user, passwd): 1.32 - try: 1.33 - conn = ldap.initialize(self.server) 1.34 - conn.set_option(ldap.OPT_REFERRALS, 0) 1.35 - conn.simple_bind_s(self.prefix + user, passwd) 1.36 - except ldap.INVALID_CREDENTIALS: 1.37 - conn.unbind() 1.38 - self.status = 'Invalid credentials' 1.39 - self.statusCode = STATUS_BADAUTH 1.40 - return False 1.41 - except ldap.SERVER_DOWN: 1.42 - self.status = 'Server is down' 1.43 - self.statusCode = STATUS_SERVERDOWN 1.44 - return False 1.45 - 1.46 - self.groups = [] 1.47 - try: 1.48 - ldapData = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, '(cn=%s)' % user, ['memberOf'] + self._needAttr)[0][1] 1.49 - for i in ldapData['memberOf']: 1.50 - self.groups.append(i.split(',')[0].split('=')[1].decode('utf-8')) 1.51 - del ldapData['memberOf'] 1.52 - self.attr = ldapData 1.53 - except KeyError: 1.54 - self.status = 'User object from LDAP is wrong, it can be anonymous logon' 1.55 - self.statusCode = STATUS_SERVERERROR 1.56 - return False 1.57 - finally: 1.58 - conn.unbind() 1.59 + def __init__(self, server, user_prefix='CORP\\', base_dn='DC=example,DC=net', attr=None): 1.60 + self.server = server 1.61 + self.prefix = user_prefix 1.62 + self.status = '' 1.63 + self.baseDN = base_dn 1.64 + self.status_code = 0 1.65 + self.groups = None 1.66 + self.attr = None 1.67 + self._needAttr = [i for i in map(str, attr)] if attr is not None else [] 1.68 + 1.69 + def __call__(self, user, passwd): 1.70 + conn = None 1.71 + try: 1.72 + conn = ldap.initialize(self.server) 1.73 + conn.set_option(ldap.OPT_REFERRALS, 0) 1.74 + conn.simple_bind_s(self.prefix + user, passwd) 1.75 + 1.76 + except ldap.INVALID_CREDENTIALS: 1.77 + if conn is not None: 1.78 + conn.unbind() 1.79 + 1.80 + self.status = 'Invalid credentials' 1.81 + self.status_code = STATUS_BADAUTH 1.82 + return False 1.83 + 1.84 + except ldap.SERVER_DOWN: 1.85 + self.status = 'Server is down' 1.86 + self.status_code = STATUS_SERVERDOWN 1.87 + return False 1.88 + 1.89 + self.groups = [] 1.90 + try: 1.91 + ldap_data = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, f'(cn={user})', 1.92 + ['memberOf'] + self._needAttr)[0][1] 1.93 + 1.94 + for i in ldap_data['memberOf']: 1.95 + self.groups.append(i.split(',')[0].split('=')[1].decode('utf-8')) 1.96 + 1.97 + del ldap_data['memberOf'] 1.98 + self.attr = ldap_data 1.99 1.100 - return True 1.101 - 1.102 - def __getitem__(self, key): 1.103 - return self.attr[key] 1.104 - 1.105 - def memberOf(self, group): 1.106 - """Проверка на присутствие у пользователя некоторой группы 1.107 - """ 1.108 - if self.groups == None: 1.109 - raise LdapError('Request membership before auth call.') 1.110 - 1.111 - if not isinstance(group, unicode): 1.112 - if isinstance(group, str): 1.113 - group = group.decode('utf-8') 1.114 - else: 1.115 - group = str(group).decode('utf-8') 1.116 - 1.117 - if group in self.groups: 1.118 - return True 1.119 - else: 1.120 - return False 1.121 - 1.122 - def __contains__(self, group): 1.123 - return self.memberOf(group) 1.124 - 1.125 - def memberOfGroups(self, groups): 1.126 - if not len(groups) > 0: 1.127 - return False 1.128 - 1.129 - for group in groups: 1.130 - if not group in self: 1.131 - return False 1.132 - 1.133 - return True 1.134 + except KeyError: 1.135 + self.status = 'User object from LDAP is wrong, it can be anonymous logon' 1.136 + self.status_code = STATUS_SERVERERROR 1.137 + return False 1.138 + 1.139 + finally: 1.140 + conn.unbind() 1.141 + 1.142 + return True 1.143 + 1.144 + def __getitem__(self, key): 1.145 + return self.attr[key] 1.146 + 1.147 + def member_of(self, group): 1.148 + """Проверка на присутствие у пользователя некоторой группы 1.149 + """ 1.150 + if self.groups is None: 1.151 + raise LdapError('Request membership before auth call') 1.152 + 1.153 + if not isinstance(group, unicode): 1.154 + if isinstance(group, str): 1.155 + group = group.decode('utf-8') 1.156 + else: 1.157 + group = str(group).decode('utf-8') 1.158 + 1.159 + if group in self.groups: 1.160 + return True 1.161 + else: 1.162 + return False 1.163 + 1.164 + def __contains__(self, group): 1.165 + return self.member_of(group) 1.166 + 1.167 + def member_of_groups(self, groups): 1.168 + if not len(groups) > 0: 1.169 + return False 1.170 + 1.171 + for group in groups: 1.172 + if group not in self: 1.173 + return False 1.174 + 1.175 + return True