tools.win_pg_dump_controller
2:7c93b0305522
Go to Latest
tools.win_pg_dump_controller/win_pg_dump_controller/store_controller.py
+ Возможность отправлять оповещения по почте
4 from datetime import datetime
5 from os.path import exists, join as p_join, basename
6 from os import remove as file_remove
7 from typing import Dict, List
9 from .error import Error
10 from .config import DBTask
13 class StoreError(Error):
17 class StoreControllerBase(object):
18 def get_filename(self, filename: str) -> str:
19 raise NotImplemented()
22 class IndexItem(object):
23 def __init__(self, controller: StoreControllerBase, time: int, filename: str):
24 self._controller = controller
25 self.filename = filename
30 if self._my_path is None:
31 self._my_path = self._controller.get_filename(self.filename)
35 def __str__(self) -> str:
38 def get_datetime(self):
39 return datetime.fromtimestamp(self.time)
41 def is_exists(self) -> bool:
42 return exists(self.get_path())
44 def to_dict(self) -> dict:
45 return dict((i, getattr(self, i)) for i in ('filename', 'time'))
48 def from_dict(cls, controller: StoreControllerBase, d: Dict):
50 controller=controller,
55 def new(cls, controller: StoreControllerBase, short_filename: str):
57 time_prefix = time.strftime('%Y-%m-%d_%H-%M-%S')
58 filename = f'{time_prefix} - {short_filename}'
61 controller=controller,
62 time=int(time.timestamp()),
67 class StoreController(StoreControllerBase):
68 def __init__(self, task: DBTask):
70 self.index_name = self.get_filename(f'{task.name}.index')
71 self.idx: Dict[int, IndexItem] = {}
72 self.op_adv_status = []
74 if exists(self.index_name):
77 def get_filename(self, filename: str) -> str:
78 return str(p_join(self.task.dst_dir, filename))
80 def load_index(self) -> None:
81 with open(self.index_name, 'r') as IN:
83 for itm in json.load(IN):
84 itm = IndexItem.from_dict(self, itm)
86 result[itm.time] = itm
90 def save_index(self) -> None:
91 with open(self.index_name, 'w') as OUT:
92 json.dump(list(map(lambda x: x.to_dict(), self.idx.values())), OUT)
94 def add_op_status(self, msg: str) -> None:
95 self.op_adv_status.append(msg)
97 def new_item(self) -> IndexItem:
98 return IndexItem.new(self, f'{self.task.name}.backup')
100 def add_item(self, item: IndexItem) -> None:
101 if not item.is_exists():
102 raise StoreError(f'Storing to index file not found: {item.get_path()}')
104 if item.time in self.idx and self.idx[item.time].filename != item.filename:
105 if self.idx[item.time].is_exists():
106 file_remove(self.idx[item.time].get_path())
108 self.idx[item.time] = item
110 def remove(self, item: IndexItem) -> None:
111 if item.time in self.idx:
112 del self.idx[item.time]
115 file_remove(item.get_path())
118 for i in sorted(self.idx, reverse=True):
121 def clean(self, tier1_days: int, tier2_copies_interval: int, tier2_store_days: int) -> List[str]:
127 if not item.is_exists():
128 to_remove.append(item)
131 storing_days = (now - item.get_datetime()).days
133 if not storing_days <= tier1_days:
134 if storing_days > tier2_store_days:
135 to_remove.append(item)
138 # Магия: Делим старые резервные копии на эпохи. Эпоха - целая часть от деления количества
139 # дней, которое лежит резервная копия, на интервал, за который нам нужно хранить
140 # хотя бы одну копию. В одной эпохе старший файл вытесняет младший. Из вычислений
141 # убираем период tier1
143 storing_days_clean = storing_days - tier1_days
145 _epoch = divmod(storing_days_clean, tier2_copies_interval)[0]
146 self.add_op_status(f'"{item}": epoch={_epoch} storing_days={storing_days}'
147 f' clean_storing_days={storing_days_clean}')
149 if _epoch in tier2_idx:
150 to_remove.append(tier2_idx[_epoch])
152 tier2_idx[_epoch] = item
155 for item in to_remove:
156 file_remove(item.get_path())
157 result.append(item.filename)