py.lib

Yohn Y. 2022-02-23 Parent:cab7fedf8432

23:1668cc57225b Go to Latest

py.lib/ldap_utils/ldap_util.py

. Рефакторинг бессмысленный и беспощадный

History
     1.1 --- a/ldap_utils/ldap_util.py	Sat Nov 27 12:29:59 2021 +0300
     1.2 +++ b/ldap_utils/ldap_util.py	Wed Feb 23 19:27:33 2022 +0300
     1.3 @@ -2,85 +2,98 @@
     1.4  import ldap
     1.5  
     1.6  # ===========================================================
     1.7 -# Статус завершения оперций
     1.8 +# Статус завершения операций
     1.9  STATUS_OK = 0           # Всё ОК.
    1.10  STATUS_BADAUTH = 1      # Не верные пользователь или пароль.
    1.11  STATUS_SERVERDOWN = 2   # Сервер не доступен.
    1.12  STATUS_SERVERERROR = 3  # Ошибка при взаимодействии с сервером.
    1.13  
    1.14 -class LdapError(Exception): pass
    1.15 +
    1.16 +class LdapError(Exception):
    1.17 +    pass
    1.18 +
    1.19  
    1.20  class LdapAuth(object):
    1.21 -	def __init__(self, server, userPrefix = 'CORP\\', baseDN = 'DC=example,DC=net', attr = []):
    1.22 -		self.server = server
    1.23 -		self.prefix = userPrefix
    1.24 -		self.status = ''
    1.25 -		self.baseDN = baseDN
    1.26 -		self.statusCode = 0
    1.27 -		self.groups = None
    1.28 -		self.attr = None
    1.29 -		self._needAttr = [ i for i in map(str, attr) ]
    1.30 -		
    1.31 -	def __call__(self, user, passwd):
    1.32 -		try:
    1.33 -			conn = ldap.initialize(self.server)
    1.34 -			conn.set_option(ldap.OPT_REFERRALS, 0)
    1.35 -			conn.simple_bind_s(self.prefix + user, passwd)
    1.36 -		except ldap.INVALID_CREDENTIALS:
    1.37 -			conn.unbind()
    1.38 -			self.status = 'Invalid credentials'
    1.39 -			self.statusCode = STATUS_BADAUTH
    1.40 -			return False
    1.41 -		except ldap.SERVER_DOWN:
    1.42 -			self.status = 'Server is down'
    1.43 -			self.statusCode = STATUS_SERVERDOWN
    1.44 -			return False
    1.45 -		
    1.46 -		self.groups = []
    1.47 -		try:
    1.48 -			ldapData = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, '(cn=%s)' % user, ['memberOf'] + self._needAttr)[0][1]
    1.49 -			for i in ldapData['memberOf']:
    1.50 -				self.groups.append(i.split(',')[0].split('=')[1].decode('utf-8'))
    1.51 -			del ldapData['memberOf']
    1.52 -			self.attr = ldapData
    1.53 -		except KeyError:
    1.54 -			self.status = 'User object from LDAP is wrong, it can be anonymous logon'
    1.55 -			self.statusCode = STATUS_SERVERERROR
    1.56 -			return False
    1.57 -		finally:
    1.58 -			conn.unbind()			
    1.59 +    def __init__(self, server, user_prefix='CORP\\', base_dn='DC=example,DC=net', attr=None):
    1.60 +        self.server = server
    1.61 +        self.prefix = user_prefix
    1.62 +        self.status = ''
    1.63 +        self.baseDN = base_dn
    1.64 +        self.status_code = 0
    1.65 +        self.groups = None
    1.66 +        self.attr = None
    1.67 +        self._needAttr = [i for i in map(str, attr)] if attr is not None else []
    1.68 +
    1.69 +    def __call__(self, user, passwd):
    1.70 +        conn = None
    1.71 +        try:
    1.72 +            conn = ldap.initialize(self.server)
    1.73 +            conn.set_option(ldap.OPT_REFERRALS, 0)
    1.74 +            conn.simple_bind_s(self.prefix + user, passwd)
    1.75 +
    1.76 +        except ldap.INVALID_CREDENTIALS:
    1.77 +            if conn is not None:
    1.78 +                conn.unbind()
    1.79 +
    1.80 +            self.status = 'Invalid credentials'
    1.81 +            self.status_code = STATUS_BADAUTH
    1.82 +            return False
    1.83 +
    1.84 +        except ldap.SERVER_DOWN:
    1.85 +            self.status = 'Server is down'
    1.86 +            self.status_code = STATUS_SERVERDOWN
    1.87 +            return False
    1.88 +
    1.89 +        self.groups = []
    1.90 +        try:
    1.91 +            ldap_data = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, f'(cn={user})',
    1.92 +                                      ['memberOf'] + self._needAttr)[0][1]
    1.93 +
    1.94 +            for i in ldap_data['memberOf']:
    1.95 +                self.groups.append(i.split(',')[0].split('=')[1].decode('utf-8'))
    1.96 +
    1.97 +            del ldap_data['memberOf']
    1.98 +            self.attr = ldap_data
    1.99  
   1.100 -		return True
   1.101 -	
   1.102 -	def __getitem__(self, key):
   1.103 -		return self.attr[key]
   1.104 -	
   1.105 -	def memberOf(self, group):
   1.106 -		"""Проверка на присутствие у пользователя некоторой группы
   1.107 -		"""
   1.108 -		if self.groups == None:
   1.109 -			raise LdapError('Request membership before auth call.')
   1.110 -			
   1.111 -		if not isinstance(group, unicode):
   1.112 -			if isinstance(group, str):
   1.113 -				group = group.decode('utf-8')
   1.114 -			else:
   1.115 -				group = str(group).decode('utf-8')
   1.116 -		
   1.117 -		if group in self.groups:
   1.118 -			return True
   1.119 -		else:
   1.120 -			return False
   1.121 -	
   1.122 -	def __contains__(self, group):
   1.123 -		return self.memberOf(group)
   1.124 -		
   1.125 -	def memberOfGroups(self, groups):
   1.126 -		if not len(groups) > 0:
   1.127 -			return False
   1.128 -		
   1.129 -		for group in groups:
   1.130 -			if not group in self:
   1.131 -				return False
   1.132 -		
   1.133 -		return True
   1.134 +        except KeyError:
   1.135 +            self.status = 'User object from LDAP is wrong, it can be anonymous logon'
   1.136 +            self.status_code = STATUS_SERVERERROR
   1.137 +            return False
   1.138 +
   1.139 +        finally:
   1.140 +            conn.unbind()
   1.141 +
   1.142 +        return True
   1.143 +
   1.144 +    def __getitem__(self, key):
   1.145 +        return self.attr[key]
   1.146 +
   1.147 +    def member_of(self, group):
   1.148 +        """Проверка на присутствие у пользователя некоторой группы
   1.149 +        """
   1.150 +        if self.groups is None:
   1.151 +            raise LdapError('Request membership before auth call')
   1.152 +
   1.153 +        if not isinstance(group, unicode):
   1.154 +            if isinstance(group, str):
   1.155 +                group = group.decode('utf-8')
   1.156 +            else:
   1.157 +                group = str(group).decode('utf-8')
   1.158 +
   1.159 +        if group in self.groups:
   1.160 +            return True
   1.161 +        else:
   1.162 +            return False
   1.163 +
   1.164 +    def __contains__(self, group):
   1.165 +        return self.member_of(group)
   1.166 +
   1.167 +    def member_of_groups(self, groups):
   1.168 +        if not len(groups) > 0:
   1.169 +            return False
   1.170 +
   1.171 +        for group in groups:
   1.172 +            if group not in self:
   1.173 +                return False
   1.174 +
   1.175 +        return True