py.lib

Yohn Y. 2022-08-19 Parent:84b54a8a6d4c Child:ae0107755941

36:f1a05e880961 Go to Latest

py.lib/config_parse_helper.py

. Рефакторинг имени в dataclass_utils.py * Изменение по типу модуля dataclass_utils.py в модуле config_parse_helper.py

History
1 # coding: utf-8
2 from configparser import ConfigParser
3 from dataclasses import is_dataclass, fields, Field, MISSING
4 from typing import Iterable, Optional, Dict, Any, List, Callable, Union
5 from threading import RLock
6 from os import getenv
9 class ConfigParseHelperError(Exception):
10 """\
11 Ошибки в модуле помощника разборщика конфигураций
12 """
15 class NoSectionNotification(ConfigParseHelperError):
16 """\
17 Оповещение об отсутствующей секции конфигурации
18 """
21 class TypeDescriber:
22 """\
23 Реализует паттерн "адаптер" поверх типов, для упрощения приведения значений типов к объявленным формам
24 """
25 def __init__(self, name, cast, like, is_complex=False):
26 self.__name__ = name
27 self.cast = cast
28 self.like = like
29 self.is_complex = is_complex
31 def __instancecheck__(self, instance):
32 if self.like is None:
33 return False
35 else:
36 return isinstance(instance, self.like)
38 def __repr__(self):
39 return f'<TypeDescriber({self.__name__}, {self.like})>'
41 def __call__(self, val):
42 if val is None:
43 return None
45 else:
46 return self.cast(val)
49 def cast_iterator(t: Union[type, TypeDescriber], lst: Iterable):
50 """\
51 Обрабатывает последовательности единого типа.
52 """
53 for i in lst:
54 if isinstance(i, t):
55 yield i
57 try:
58 yield t(i)
60 except (TypeError, ValueError) as e:
61 raise ValueError(f'Не удалось привести значение к нужному типу: тип={t.__name__}; знач={i}')
64 def multi_item_tuple(tt, val):
65 """\
66 Обрабатывает кортежи, состоящие из нескольких значений типов (кортежи элементов разных типов)
67 :param tt: Последовательность составляющих кортеж типов
68 :param val: итерируемый объект, хранящий значения в указанном порядке.
69 """
70 val = list(val)
71 t_len = len(tt)
72 if t_len == 0:
73 raise ValueError('При вызове процедуры конвертации котежей, не были указаны типы')
75 if len(val) != t_len:
76 raise ValueError(f'Значение не содержит положенных {t_len} элементов: {len(val)} - {val}')
78 res = []
80 for i in range(t_len):
81 if isinstance(val[i], tt[i]):
82 yield val[i]
84 try:
85 res.append(tt[i](val[i]))
87 except (TypeError, ValueError) as e:
88 raise ValueError(f'Не удалось привести значение к нужному типу: тип={tt[i].__name__}; знач={val[i]}')
90 return tuple(res)
93 def union_processor(tt, val):
94 """\
95 Пытается привести значение к одному из указанных типов
96 :param tt: список возможных типов
97 :param val: приводимое значение
98 """
99 if val is None:
100 return val
102 res = None
103 ex = []
105 if len(tt) == 0:
106 raise ValueError('Не указан ни один тип в составном типе Union')
108 for t in tt:
109 try:
110 res = t(val)
111 break
113 except (TypeError, ValueError) as e:
114 ex.append(f'{t}: {e}')
116 if res is None:
117 raise ValueError('Не удалось привести значение не к одному из типов:\n' + '\n'.join(ex))
119 else:
120 return res
123 def dict_processor(tt, val):
124 if len(tt) != 2:
125 raise ValueError(f'Попытка воссоздать словарь со странным количеством аргументов типа: {tt}')
127 try:
128 _d = dict(val)
130 except (TypeError, ValueError) as e:
131 raise ValueError(f'Не удалось воссоздать словарь из представленного значения: {e}')
133 _p = []
135 for k, v in _d.items():
136 try:
137 _p.append((tt[0](k), tt[1](v)))
139 except (TypeError, ValueError) as e:
140 raise ValueError(f'Не удалось привести значения элемента словаря к требуемому типу: '
141 f'key="{k}" value="{v}"')
143 return dict(_p)
146 def get_type_describer(t) -> TypeDescriber:
147 if type(t).__name__ == '_GenericAlias':
148 try:
149 _args = t.__args__
151 except AttributeError:
152 raise TypeError(f'Неизвестный тип хранения внутренних типов для представителя сложного '
153 f'типа "_GenericAlias": {t}')
155 if t.__name__ == 'List':
156 try:
157 _t = _args[0]
159 except IndexError:
160 raise ValueError(f'Тип {t} не содержит в себе типа своих элементов')
162 return TypeDescriber(
163 name=f'{t.__name__}[{_t.__name__}]',
164 cast=lambda x: list(cast_iterator(get_type_describer(_t).cast, x)),
165 like=list,
166 is_complex=True
169 elif t.__name__ == 'Tuple':
170 if not _args:
171 raise ValueError(f'Тип {t} не содержит в себе пояснений по составу хранящихся в нём типов')
173 if len(_args) == 1:
174 _t = _args[0]
176 return TypeDescriber(
177 name=f'Tuple[{_t.__name__}]',
178 cast=lambda x: list(cast_iterator(get_type_describer(_t).cast, x)),
179 like=tuple,
180 is_complex=True
183 else:
184 _name = ', '.join(map(lambda x: x.__name__, _args))
185 _cast_args = tuple(get_type_describer(i) for i in _args)
187 return TypeDescriber(
188 name=f'Tuple[{_name}]',
189 cast=lambda x: multi_item_tuple(_cast_args, x),
190 like=tuple,
191 is_complex=True
194 elif t.__name__ == 'Dict':
195 if len(_args) != 2:
196 raise ValueError(f'Неожиданное количество значений типа в составном типе Dict: '
197 f'{len(_args)} values=({_args})')
199 _name = ', '.join(map(lambda x: x.__name__, _args))
201 return TypeDescriber(
202 name=f'Dict[{_name}]',
203 cast=lambda x: dict_processor(_args, x),
204 like=dict,
205 is_complex=True
208 else:
209 raise ValueError(f'Неизвестный представитель типа "_GenericAlias": {t.__name__}')
211 elif type(t).__name__ == '_UnionGenericAlias':
212 if t.__name__ not in ('Union', 'Optional'):
213 raise TypeError(f'Неизвестный подтип _UnionGenericAlias: {t.__name__}')
215 try:
216 _args = t.__args__
218 except AttributeError:
219 raise TypeError(f'Неизвестный тип хранения внутренних типов для представителя сложного '
220 f'типа "_UnionGenericAlias": {t}')
222 if len(_args) == 0:
223 raise ValueError('Не указан ни один тип в конструкции Union')
225 _cast_args = tuple(map(get_type_describer, _args))
226 _args_name = ', '.join(map(lambda x: x.__name__, _args))
228 return TypeDescriber(
229 name=f'Union[{_args_name}]',
230 cast=lambda x: union_processor(_cast_args, x),
231 like=None
234 else:
235 return TypeDescriber(
236 name=t.__name__,
237 cast=t,
238 like=t
242 class CPHSectionBase:
243 """\
244 Базовый класс обработки секции конфигурационного файла
245 """
247 def get(self, config_prop_name: str, dc_prop_name: str):
248 """\
249 Получить свойство из конфигурационного файла
250 """
251 raise NotImplemented()
253 def __enter__(self):
254 return self
256 def __exit__(self, exc_type, exc_val, exc_tb):
257 raise NotImplemented()
260 class CPHAbsentSection(CPHSectionBase):
261 """\
262 Класс создаваемый на отсутствующую секцию конфигурационного файла
263 """
264 def get(self, config_prop_name: str, dc_prop_name: str):
265 raise NoSectionNotification()
267 def __exit__(self, exc_type, exc_val, exc_tb):
268 if exc_type == NoSectionNotification:
269 return True
272 class CPHParamGetter(CPHSectionBase):
273 def __init__(self, parser_helper_object):
274 self.ph = parser_helper_object
275 self.params = {}
277 def _add_param(self, param_name: str, param_val: Any, parser: Optional[Callable[[Any], Any]] = None):
278 """\
279 Непосредственное добавление полученного параметра со всеми проверками.
280 """
282 if parser is not None and param_val is not None:
283 param_val = parser(param_val)
285 fld = self.ph.fields.get(param_name)
286 if not isinstance(fld, Field):
287 raise ConfigParseHelperError(f'В классе данных отсутствует свойство "{param_name}", '
288 f'которое мы должны заполнить из параметра конфигурации: {fld}')
290 if param_val is not None:
291 type_desc = get_type_describer(fld.type)
292 try:
293 res = type_desc(param_val)
295 except (ValueError, TypeError) as e:
296 raise ConfigParseHelperError(f'При приведении параметра к '
297 f'заданному типу произошла ошибка: '
298 f'значение="{param_val}" ошибка="{e}"')
300 else:
301 if fld.default is not MISSING:
302 res = fld.default
304 elif fld.default_factory is not MISSING:
305 res = fld.default_factory()
307 else:
308 raise ConfigParseHelperError('В конфигурации не заданна обязательная опция')
310 self.params[param_name] = res
312 def __exit__(self, exc_type, exc_val, exc_tb):
313 if exc_type is None:
314 self.ph.add_params(self.params)
316 def get(self, config_prop_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
317 raise NoSectionNotification()
320 class CPHSection(CPHParamGetter):
321 """\
322 Класс производящий разбор конкретной секции конфигурации
323 """
325 def __init__(self, parser_helper_object, section: str):
326 super().__init__(parser_helper_object)
327 self.section_name = section
328 self.section = parser_helper_object.conf_parser[section]
330 def get(self, config_prop_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
331 """\
332 :param config_prop_name: Имя опции в файле конфигурации
333 :param dc_prop_name: Имя свойства в классе данных, хранящем конфигурацию
334 :param parser: Исполнимый обработчик значения, перед его помещением в конфигурацию
335 """
336 try:
337 self._add_param(dc_prop_name, self.section.get(config_prop_name), parser)
339 except ConfigParseHelperError as e:
340 raise ConfigParseHelperError(f'Ошибка при разборе параметра "{config_prop_name}" '
341 f'в секции "{self.section_name}": {e}')
344 class CPHEnvParser(CPHParamGetter):
345 """\
346 Класс для разбора переменных окружения в том же ключе, что и файла конфигурации
347 """
349 def get(self, env_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
350 """\
351 :param env_name: Имя переменной окружения, хранящей опцию
352 :param dc_prop_name: Имя свойства в классе данных, хранящем конфигурацию
353 :param parser: Исполнимый обработчик значения, перед его помещением в конфигурацию
354 """
356 try:
357 self._add_param(dc_prop_name, getenv(env_name), parser)
359 except ConfigParseHelperError as e:
360 raise ConfigParseHelperError(f'Ошибка при получении значения из переменной окружения "{env_name}": {e}')
363 class CPHObjectsListGetter:
364 """\
365 Помощник для случаев, когда в наборе секций хранится однотипный набор объектов
366 """
367 def __init__(self, config_object_class: type, config_parser: ConfigParser, sections: Iterable[str]):
368 self.sections = sections
369 self.conf_parser = config_parser
371 if not is_dataclass(config_object_class):
372 raise ConfigParseHelperError(f'Представленный в качестве представления объекта конфигурации '
373 f'класс не является классом данных: {config_object_class.__name__}')
375 self.res_obj = config_object_class
376 self.fields = dict((fld.name, fld) for fld in fields(config_object_class))
377 self.obj_list = []
378 self.ident_list = []
380 def add_params(self, params: Dict[str, Any]):
381 try:
382 self.obj_list.append(self.res_obj(**params))
384 except (ValueError, TypeError) as e:
385 raise ConfigParseHelperError(f'Ошибка создания объекта объекта конфигурации, '
386 f'списка объектов конфигурации: {e}')
388 def get(self, config_prop_name: str, dc_prop_name: str, parser: Optional[Callable[[Any], Any]] = None):
389 """\
390 Подготавливаем список соответствия названий параметров в секции конкретным свойствам данного
391 в помощник класса
393 :param config_prop_name: Имя опции в файле конфигурации
394 :param dc_prop_name: Имя свойства в классе данных, хранящем конфигурацию
395 :param parser: Исполнимый обработчик значения, перед его помещением в конфигурацию
396 """
398 self.ident_list.append((config_prop_name, dc_prop_name, parser))
400 def get_config_objects(self) -> List[object]:
401 for section in self.sections:
402 try:
403 with CPHSection(self, section) as section_helper:
404 for conf_prop, dc_prop, parser in self.ident_list:
405 section_helper.get(conf_prop, dc_prop, parser)
407 except ConfigParseHelperError as e:
408 raise ConfigParseHelperError(f'Ошибка при разборе секции "{section}": {e}')
410 res = self.obj_list[:]
411 self.obj_list.clear()
413 return res
416 class ConfigParseHelper:
417 """\
418 Помощник разбора конфигурации
419 """
420 def __init__(self, config_object_class: type, required_sections: Optional[Iterable[str]] = None):
421 """\
422 :param config_object_class: Dataclass, который мы подготовили как хранилище параметров конфигурации
423 :param required_sections: Перечисление секций конфигурации, которые мы требуем, чтобы были в файле
424 """
426 if required_sections is not None:
427 self.req_sections = set(required_sections)
429 else:
430 self.req_sections = set()
432 if not is_dataclass(config_object_class):
433 raise ConfigParseHelperError(f'Представленный в качестве объекта конфигурации класс не является '
434 f'классом данных: {config_object_class.__name__}')
436 self.res_obj = config_object_class
437 self.fields = dict((fld.name, fld) for fld in fields(config_object_class))
438 self.conf_parser: Optional[ConfigParser] = None
439 self.config_params = {}
440 self.config_params_lock = RLock()
442 def add_params(self, params: Dict[str, Any]):
443 self.config_params_lock.acquire()
444 try:
445 self.config_params.update(params)
447 finally:
448 self.config_params_lock.release()
450 def section(self, section_name: str) -> CPHSectionBase:
451 if self.conf_parser is None:
452 raise ConfigParseHelperError(f'Прежде чем приступать к разбору файла конфигурации стоит его загрузить')
454 if self.conf_parser.has_section(section_name):
455 return CPHSection(self, section_name)
457 else:
458 return CPHAbsentSection()
460 def load(self, filename: str):
461 res = ConfigParser()
462 try:
463 res.read(filename)
465 except (TypeError, IOError, OSError, ValueError) as e:
466 raise ConfigParseHelperError(f'Не удалось загрузить файл конфигурации: файл="{filename}" '
467 f'ошибка="{e}"')
469 missing_sections = self.req_sections - set(res.sections())
471 if missing_sections:
472 missing_sections = ', '.join(missing_sections)
473 raise ConfigParseHelperError(f'В конфигурационном файле отсутствуют секции: {missing_sections}')
475 self.conf_parser = res
477 def get_config(self):
478 try:
479 return self.res_obj(**self.config_params)
481 except (ValueError, TypeError) as e:
482 raise ConfigParseHelperError(f'Не удалось инициализировать объект конфигурации: {e}')
484 def get_sections(self):
485 return self.conf_parser.sections()