py.lib

Yohn Y. 2022-08-19 Parent:f1a05e880961

37:ae0107755941 Go to Latest

py.lib/config_parse_helper.py

. Исправление ошибок и рефакторинг

History
1 # coding: utf-8
2 from configparser import ConfigParser
3 from dataclasses import is_dataclass, fields, Field, MISSING
4 from typing import Iterable, Optional, Dict, Any, List, Callable, Union
5 from threading import RLock
6 from os import getenv
9 class ConfigParseHelperError(Exception):
10 """\
11 Ошибки в модуле помощника разборщика конфигураций
12 """
15 class NoSectionNotification(ConfigParseHelperError):
16 """\
17 Оповещение об отсутствующей секции конфигурации
18 """
21 class TypeDescriber:
22 """\
23 Реализует паттерн "адаптер" поверх типов, для упрощения приведения значений типов к объявленным формам
24 """
25 def __init__(self, name, cast, like, is_complex=False):
26 self.__name__ = name
27 self.cast = cast
28 self.like = like
29 self.is_complex = is_complex
31 def check(self, instance):
32 if self.like is None:
33 return False
35 else:
36 return check_instance(instance, self.like)
38 def __repr__(self):
39 return f'<TypeDescriber({self.__name__}, {self.like})>'
41 def __call__(self, val):
42 if val is None:
43 return None
45 elif not self.is_complex and check_instance(val, self.like):
46 return val
48 else:
49 return self.cast(val)
52 def check_instance(obj, types):
53 if types is None:
54 return False
56 elif isinstance(types, (tuple, list)):
57 _flag = False
58 for t in types:
59 if check_instance(obj, t):
60 _flag = True
61 break
63 return _flag
65 elif isinstance(types, TypeDescriber):
66 return types.check(obj)
68 else:
69 return isinstance(obj, types)
72 def cast_iterator(t: Union[type, TypeDescriber], lst: Iterable):
73 """\
74 Обрабатывает последовательности единого типа.
75 """
76 for i in lst:
77 if check_instance(i, t):
78 yield i
80 else:
81 try:
82 yield t(i)
84 except (TypeError, ValueError) as e:
85 raise ValueError(f'Не удалось привести значение к нужному типу: '
86 f'тип={t.__name__}; знач={i}')
89 def multi_item_tuple(tt, val):
90 """\
91 Обрабатывает кортежи, состоящие из нескольких значений типов (кортежи элементов разных типов)
92 :param tt: Последовательность составляющих кортеж типов
93 :param val: итерируемый объект, хранящий значения в указанном порядке.
94 """
95 val = list(val)
96 t_len = len(tt)
97 if t_len == 0:
98 raise ValueError('При вызове процедуры конвертации котежей, не были указаны типы')
100 if len(val) != t_len:
101 raise ValueError(f'Значение не содержит положенных {t_len} элементов: {len(val)} - {val}')
103 res = []
105 for i in range(t_len):
106 if check_instance(val[i], tt[i]):
107 res.append(val[i])
109 else:
110 try:
111 res.append(tt[i](val[i]))
113 except (TypeError, ValueError) as e:
114 raise ValueError(f'Не удалось привести значение к нужному типу: '
115 f'тип={tt[i].__name__}; знач={val[i]}')
117 return tuple(res)
120 def union_processor(tt, val):
121 """\
122 Пытается привести значение к одному из указанных типов
123 :param tt: список возможных типов
124 :param val: приводимое значение
125 """
126 if val is None:
127 return val
129 res = None
130 ex = []
132 if len(tt) == 0:
133 raise ValueError('Не указан ни один тип в составном типе Union')
135 sorted_types_begin = [ t for t in tt if t in (int, float, bool)]
136 sorted_types_body = [ t for t in tt if t not in (int, float, bool, str)]
137 sorted_types_end = [ t for t in tt if t == str]
139 for t in sorted_types_begin + sorted_types_body + sorted_types_end:
140 _t = get_type_describer(t)
141 try:
142 res = _t(val)
143 break
145 except (TypeError, ValueError) as e:
146 ex.append(f'{t}: {e}')
148 if res is None:
149 raise ValueError('Не удалось привести значение не к одному из типов:\n' + '\n'.join(ex))
151 else:
152 return res
155 def dict_processor(tt, val):
156 if len(tt) != 2:
157 raise ValueError(f'Попытка воссоздать словарь со странным количеством аргументов типа: {tt}')
159 try:
160 _d = dict(val)
162 except (TypeError, ValueError) as e:
163 raise ValueError(f'Не удалось воссоздать словарь из представленного значения: {e}')
165 _p = []
167 for k, v in _d.items():
168 try:
169 _p.append((tt[0](k), tt[1](v)))
171 except (TypeError, ValueError) as e:
172 raise ValueError(f'Не удалось привести значения элемента словаря к требуемому типу: '
173 f'key="{k}" value="{v}"')
175 return dict(_p)
178 def get_type_describer(t) -> TypeDescriber:
179 if type(t).__name__ == '_GenericAlias':
180 try:
181 _args = t.__args__
183 except AttributeError:
184 raise TypeError(f'Неизвестный тип хранения внутренних типов для представителя сложного '
185 f'типа "_GenericAlias": {t}')
187 if t.__name__ == 'List':
188 try:
189 _t = _args[0]
191 except IndexError:
192 raise ValueError(f'Тип {t} не содержит в себе типа своих элементов')
194 return TypeDescriber(
195 name=f'{t.__name__}[{_t.__name__}]',
196 cast=lambda x: list(cast_iterator(get_type_describer(_t).cast, x)),
197 like=list,
198 is_complex=True
201 elif t.__name__ == 'Tuple':
202 if not _args:
203 raise ValueError(f'Тип {t} не содержит в себе пояснений по составу хранящихся в нём типов')
205 if len(_args) == 1:
206 _t = _args[0]
208 return TypeDescriber(
209 name=f'Tuple[{_t.__name__}]',
210 cast=lambda x: list(cast_iterator(get_type_describer(_t).cast, x)),
211 like=tuple,
212 is_complex=True
215 else:
216 _name = ', '.join(map(lambda x: x.__name__, _args))
217 _cast_args = tuple(get_type_describer(i) for i in _args)
219 return TypeDescriber(
220 name=f'Tuple[{_name}]',
221 cast=lambda x: multi_item_tuple(_cast_args, x),
222 like=tuple,
223 is_complex=True
226 elif t.__name__ == 'Dict':
227 if len(_args) != 2:
228 raise ValueError(f'Неожиданное количество значений типа в составном типе Dict: '
229 f'{len(_args)} values=({_args})')
231 _name = ', '.join(map(lambda x: x.__name__, _args))
233 return TypeDescriber(
234 name=f'Dict[{_name}]',
235 cast=lambda x: dict_processor(_args, x),
236 like=dict,
237 is_complex=True
240 else:
241 raise ValueError(f'Неизвестный представитель типа "_GenericAlias": {t.__name__}')
243 elif type(t).__name__ == '_UnionGenericAlias':
244 if t.__name__ not in ('Union', 'Optional'):
245 raise TypeError(f'Неизвестный подтип _UnionGenericAlias: {t.__name__}')
247 try:
248 _args = t.__args__
250 except AttributeError:
251 raise TypeError(f'Неизвестный тип хранения внутренних типов для представителя сложного '
252 f'типа "_UnionGenericAlias": {t}')
254 if len(_args) == 0:
255 raise ValueError('Не указан ни один тип в конструкции Union')
257 _cast_args = tuple(map(get_type_describer, _args))
258 _args_name = ', '.join(map(lambda x: x.__name__, _args))
260 return TypeDescriber(
261 name=f'Union[{_args_name}]',
262 cast=lambda x: union_processor(_cast_args, x),
263 like=None
266 else:
267 return TypeDescriber(
268 name=t.__name__,
269 cast=t,
270 like=t
274 class CPHSectionBase:
275 """\
276 Базовый класс обработки секции конфигурационного файла
277 """
279 def get(self, config_prop_name: str, dc_prop_name: str):
280 """\
281 Получить свойство из конфигурационного файла
282 """
283 raise NotImplemented()
285 def __enter__(self):
286 return self
288 def __exit__(self, exc_type, exc_val, exc_tb):
289 raise NotImplemented()
292 class CPHAbsentSection(CPHSectionBase):
293 """\
294 Класс создаваемый на отсутствующую секцию конфигурационного файла
295 """
296 def get(self, config_prop_name: str, dc_prop_name: str):
297 raise NoSectionNotification()
299 def __exit__(self, exc_type, exc_val, exc_tb):
300 if exc_type == NoSectionNotification:
301 return True
304 class CPHParamGetter(CPHSectionBase):
305 def __init__(self, parser_helper_object):
306 self.ph = parser_helper_object
307 self.params = {}
309 def _add_param(self, param_name: str, param_val: Any, parser: Optional[Callable[[Any], Any]] = None):
310 """\
311 Непосредственное добавление полученного параметра со всеми проверками.
312 """
314 if parser is not None and param_val is not None:
315 param_val = parser(param_val)
317 fld = self.ph.fields.get(param_name)
318 if not isinstance(fld, Field):
319 raise ConfigParseHelperError(f'В классе данных отсутствует свойство "{param_name}", '
320 f'которое мы должны заполнить из параметра конфигурации: {fld}')
322 if param_val is not None:
323 type_desc = get_type_describer(fld.type)
324 try:
325 res = type_desc(param_val)
327 except (ValueError, TypeError) as e:
328 raise ConfigParseHelperError(f'При приведении параметра к '
329 f'заданному типу произошла ошибка: '
330 f'значение="{param_val}" ошибка="{e}"')
332 else:
333 if fld.default is not MISSING:
334 res = fld.default
336 elif fld.default_factory is not MISSING:
337 res = fld.default_factory()
339 else:
340 raise ConfigParseHelperError('В конфигурации не заданна обязательная опция')
342 self.params[param_name] = res
344 def __exit__(self, exc_type, exc_val, exc_tb):
345 if exc_type is None:
346 self.ph.add_params(self.params)
348 def get(self, config_prop_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
349 raise NoSectionNotification()
352 class CPHSection(CPHParamGetter):
353 """\
354 Класс производящий разбор конкретной секции конфигурации
355 """
357 def __init__(self, parser_helper_object, section: str):
358 super().__init__(parser_helper_object)
359 self.section_name = section
360 self.section = parser_helper_object.conf_parser[section]
362 def get(self, config_prop_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
363 """\
364 :param config_prop_name: Имя опции в файле конфигурации
365 :param dc_prop_name: Имя свойства в классе данных, хранящем конфигурацию
366 :param parser: Исполнимый обработчик значения, перед его помещением в конфигурацию
367 """
368 try:
369 self._add_param(dc_prop_name, self.section.get(config_prop_name), parser)
371 except ConfigParseHelperError as e:
372 raise ConfigParseHelperError(f'Ошибка при разборе параметра "{config_prop_name}" '
373 f'в секции "{self.section_name}": {e}')
376 class CPHEnvParser(CPHParamGetter):
377 """\
378 Класс для разбора переменных окружения в том же ключе, что и файла конфигурации
379 """
381 def get(self, env_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
382 """\
383 :param env_name: Имя переменной окружения, хранящей опцию
384 :param dc_prop_name: Имя свойства в классе данных, хранящем конфигурацию
385 :param parser: Исполнимый обработчик значения, перед его помещением в конфигурацию
386 """
388 try:
389 self._add_param(dc_prop_name, getenv(env_name), parser)
391 except ConfigParseHelperError as e:
392 raise ConfigParseHelperError(f'Ошибка при получении значения из переменной окружения "{env_name}": {e}')
395 class CPHObjectsListGetter:
396 """\
397 Помощник для случаев, когда в наборе секций хранится однотипный набор объектов
398 """
399 def __init__(self, config_object_class: type, config_parser: ConfigParser, sections: Iterable[str]):
400 self.sections = sections
401 self.conf_parser = config_parser
403 if not is_dataclass(config_object_class):
404 raise ConfigParseHelperError(f'Представленный в качестве представления объекта конфигурации '
405 f'класс не является классом данных: {config_object_class.__name__}')
407 self.res_obj = config_object_class
408 self.fields = dict((fld.name, fld) for fld in fields(config_object_class))
409 self.obj_list = []
410 self.ident_list = []
412 def add_params(self, params: Dict[str, Any]):
413 try:
414 self.obj_list.append(self.res_obj(**params))
416 except (ValueError, TypeError) as e:
417 raise ConfigParseHelperError(f'Ошибка создания объекта объекта конфигурации, '
418 f'списка объектов конфигурации: {e}')
420 def get(self, config_prop_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
421 """\
422 Подготавливаем список соответствия названий параметров в секции конкретным свойствам данного
423 в помощник класса
425 :param config_prop_name: Имя опции в файле конфигурации
426 :param dc_prop_name: Имя свойства в классе данных, хранящем конфигурацию
427 :param parser: Исполнимый обработчик значения, перед его помещением в конфигурацию
428 """
430 self.ident_list.append((config_prop_name, dc_prop_name, parser))
432 def get_config_objects(self) -> List[object]:
433 for section in self.sections:
434 try:
435 with CPHSection(self, section) as section_helper:
436 for conf_prop, dc_prop, parser in self.ident_list:
437 section_helper.get(conf_prop, dc_prop, parser)
439 except ConfigParseHelperError as e:
440 raise ConfigParseHelperError(f'Ошибка при разборе секции "{section}": {e}')
442 res = self.obj_list[:]
443 self.obj_list.clear()
445 return res
448 class ConfigParseHelper:
449 """\
450 Помощник разбора конфигурации
451 """
452 def __init__(self, config_object_class: type, required_sections: Optional[Iterable[str]] = None):
453 """\
454 :param config_object_class: Dataclass, который мы подготовили как хранилище параметров конфигурации
455 :param required_sections: Перечисление секций конфигурации, которые мы требуем, чтобы были в файле
456 """
458 if required_sections is not None:
459 self.req_sections = set(required_sections)
461 else:
462 self.req_sections = set()
464 if not is_dataclass(config_object_class):
465 raise ConfigParseHelperError(f'Представленный в качестве объекта конфигурации класс не является '
466 f'классом данных: {config_object_class.__name__}')
468 self.res_obj = config_object_class
469 self.fields = dict((fld.name, fld) for fld in fields(config_object_class))
470 self.conf_parser: Optional[ConfigParser] = None
471 self.config_params = {}
472 self.config_params_lock = RLock()
474 def add_params(self, params: Dict[str, Any]):
475 self.config_params_lock.acquire()
476 try:
477 self.config_params.update(params)
479 finally:
480 self.config_params_lock.release()
482 def section(self, section_name: str) -> CPHSectionBase:
483 if self.conf_parser is None:
484 raise ConfigParseHelperError(f'Прежде чем приступать к разбору файла конфигурации стоит его загрузить')
486 if self.conf_parser.has_section(section_name):
487 return CPHSection(self, section_name)
489 else:
490 return CPHAbsentSection()
492 def load(self, filename: str):
493 res = ConfigParser()
494 try:
495 res.read(filename)
497 except (TypeError, IOError, OSError, ValueError) as e:
498 raise ConfigParseHelperError(f'Не удалось загрузить файл конфигурации: файл="{filename}" '
499 f'ошибка="{e}"')
501 missing_sections = self.req_sections - set(res.sections())
503 if missing_sections:
504 missing_sections = ', '.join(missing_sections)
505 raise ConfigParseHelperError(f'В конфигурационном файле отсутствуют секции: {missing_sections}')
507 self.conf_parser = res
509 def get_config(self):
510 try:
511 return self.res_obj(**self.config_params)
513 except (ValueError, TypeError) as e:
514 raise ConfigParseHelperError(f'Не удалось инициализировать объект конфигурации: {e}')
516 def get_sections(self):
517 return self.conf_parser.sections()