tools.win_pg_dump_controller
5:7de4fb4ef2f5
Go to Latest
tools.win_pg_dump_controller/win_pg_dump_controller/store_controller.py
. Не тот порт по умолчанию для postgres
4 from datetime import datetime
5 from os.path import exists, join as p_join, basename
6 from os import remove as file_remove
7 from typing import Dict, List
9 from .error import Error
10 from .config import DBTask
13 class StoreError(Error):
17 class StoreControllerBase(object):
18 def get_filename(self, filename: str) -> str:
19 raise NotImplemented()
22 class IndexItem(object):
23 def __init__(self, controller: StoreControllerBase, time: int, filename: str, epoch: int = 0):
24 self._controller = controller
25 self.filename = filename
31 if self._my_path is None:
32 self._my_path = self._controller.get_filename(self.filename)
36 def set_epoch(self, num_days: int):
38 Устанавливает значение эпохи для элемиента в соответствии с количеством дней в эпохе
40 self.epoch = divmod(self.time, num_days * 86400)[0] # 86400 - секунд в сутках
42 def __str__(self) -> str:
45 def full_desc(self) -> str:
46 return f'{self.filename} [epoch="{self.epoch}" created={self.get_datetime()}]'
48 def get_datetime(self):
49 return datetime.fromtimestamp(self.time)
51 def is_exists(self) -> bool:
52 return exists(self.get_path())
54 def to_dict(self) -> dict:
55 return dict((i, getattr(self, i)) for i in ('filename', 'time', 'epoch'))
58 def from_dict(cls, controller: StoreControllerBase, d: Dict):
60 controller=controller,
65 def new(cls, controller: StoreControllerBase, short_filename: str):
67 time_prefix = time.strftime('%Y-%m-%d_%H-%M-%S')
68 filename = f'{time_prefix} - {short_filename}'
71 controller=controller,
72 time=int(time.timestamp()),
77 class StoreController(StoreControllerBase):
78 def __init__(self, task: DBTask):
80 self.index_name = self.get_filename(f'{task.name}.index')
81 self.idx: Dict[int, IndexItem] = {}
82 self.op_adv_status = []
84 if exists(self.index_name):
87 def get_filename(self, filename: str) -> str:
88 return str(p_join(self.task.dst_dir, filename))
90 def load_index(self) -> None:
91 with open(self.index_name, 'r') as IN:
93 for itm in json.load(IN):
94 itm = IndexItem.from_dict(self, itm)
96 result[itm.time] = itm
100 def set_epoch(self, num_days: int):
101 for i in self.idx.values():
102 i.set_epoch(num_days)
104 def save_index(self) -> None:
105 with open(self.index_name, 'w') as OUT:
106 json.dump(list(map(lambda x: x.to_dict(), self.idx.values())), OUT)
108 def add_op_status(self, msg: str) -> None:
109 self.op_adv_status.append(msg)
111 def new_item(self) -> IndexItem:
112 return IndexItem.new(self, f'{self.task.name}.backup')
114 def add_item(self, item: IndexItem) -> None:
115 if not item.is_exists():
116 raise StoreError(f'Storing to index file not found: {item.get_path()}')
118 if item.time in self.idx and self.idx[item.time].filename != item.filename:
119 if self.idx[item.time].is_exists():
120 file_remove(self.idx[item.time].get_path())
122 self.idx[item.time] = item
124 def remove(self, item: IndexItem) -> None:
125 if item.time in self.idx:
126 del self.idx[item.time]
129 file_remove(item.get_path())
132 for i in sorted(self.idx, reverse=True):
135 def clean(self, tier1_days: int, tier2_store_days: int, epoch_days: int) -> List[str]:
140 self.set_epoch(epoch_days)
143 if not item.is_exists():
144 to_remove.append(item)
147 storing_days = (now - item.get_datetime()).days
149 if not storing_days <= tier1_days:
150 if storing_days > tier2_store_days:
151 to_remove.append(item)
154 tier2_storing_days = storing_days - tier1_days
156 self.add_op_status(f'"{item.full_desc()}": storing_days={storing_days}'
157 f' tier2_storing_days={tier2_storing_days}')
159 if item.epoch in tier2_idx:
160 to_remove.append(tier2_idx[item.epoch])
162 tier2_idx[item.epoch] = item
165 for item in to_remove:
166 file_remove(item.get_path())
167 result.append(item.filename)