Coverage for custom_components/supernotify/archive.py: 88%

206 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-01 15:06 +0000

1import datetime as dt 

2import json 

3import logging 

4from abc import abstractmethod 

5from pathlib import Path 

6from typing import TYPE_CHECKING, Any 

7 

8import aiofiles.os 

9import anyio 

10import homeassistant.util.dt as dt_util 

11from homeassistant.const import ( 

12 CONF_DEBUG, 

13 CONF_ENABLED, 

14) 

15from homeassistant.helpers import condition as condition 

16 

17from .const import ( 

18 CONF_ARCHIVE_DAYS, 

19 CONF_ARCHIVE_DIAGNOSTICS, 

20 CONF_ARCHIVE_EVENT_NAME, 

21 CONF_ARCHIVE_EVENT_SELECTION, 

22 CONF_ARCHIVE_MQTT_QOS, 

23 CONF_ARCHIVE_MQTT_RETAIN, 

24 CONF_ARCHIVE_MQTT_TOPIC, 

25 CONF_ARCHIVE_PATH, 

26 CONF_ARCHIVE_PURGE_INTERVAL, 

27) 

28from .schema import Outcome, OutcomeSelection 

29 

30if TYPE_CHECKING: 

31 from homeassistant.helpers.typing import ConfigType 

32 

33 from custom_components.supernotify.hass_api import HomeAssistantAPI 

34 

35_LOGGER = logging.getLogger(__name__) 

36 

37ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60 

38ARCHIVE_DEFAULT_DAYS = 1 

39WRITE_TEST = ".startup" 

40 

41 

42class ArchivableObject: 

43 @abstractmethod 

44 def base_filename(self) -> str: 

45 pass 

46 

47 @abstractmethod 

48 def contents(self, diagnostics: bool = False, **_kwargs: Any) -> Any: 

49 pass 

50 

51 def outcome(self) -> Outcome: 

52 return Outcome.NO_DELIVERY 

53 

54 def selected(self, outcome_policy: OutcomeSelection) -> bool: 

55 if outcome_policy & OutcomeSelection.NONE: 

56 return False 

57 return bool( 

58 outcome_policy & OutcomeSelection.ALL 

59 or (outcome_policy & OutcomeSelection.SUCCESS and self.outcome() == Outcome.SUCCESS) 

60 or (outcome_policy & OutcomeSelection.NO_DELIVERY and self.outcome() == Outcome.NO_DELIVERY) 

61 or (outcome_policy & OutcomeSelection.PARTIAL_DELIVERY and self.outcome() == Outcome.PARTIAL_DELIVERY) 

62 or (outcome_policy & OutcomeSelection.DUPE and self.outcome() == Outcome.DUPE) 

63 or (outcome_policy & OutcomeSelection.FALLBACK_DELIVERY and self.outcome() == Outcome.FALLBACK_DELIVERY) 

64 or (outcome_policy & OutcomeSelection.ERROR and self.outcome() == Outcome.ERROR) 

65 ) 

66 

67 

68class ArchiveDestination: 

69 @abstractmethod 

70 async def archive(self, archive_object: ArchivableObject) -> bool: 

71 pass 

72 

73 

74class EventArchiver(ArchiveDestination): 

75 def __init__( 

76 self, hass_api: HomeAssistantAPI, event_name: str, diagnostics: OutcomeSelection = OutcomeSelection.ERROR 

77 ) -> None: 

78 self.hass_api = hass_api 

79 self.event_name = event_name 

80 self.diagnostics = diagnostics 

81 if diagnostics & OutcomeSelection.NONE: 

82 pass 

83 elif diagnostics & OutcomeSelection.ALL: 

84 _LOGGER.info("SUPERNOTIFY archiving all notifications as %s events", event_name) 

85 else: 

86 if diagnostics & OutcomeSelection.SUCCESS: 

87 _LOGGER.info("SUPERNOTIFY archiving successful notifications as %s events", event_name) 

88 if diagnostics & OutcomeSelection.PARTIAL_DELIVERY: 

89 _LOGGER.info("SUPERNOTIFY archiving partial delivery notifications as %s events", event_name) 

90 

91 if diagnostics & OutcomeSelection.FALLBACK_DELIVERY: 

92 _LOGGER.info("SUPERNOTIFY archiving fallback notifications as %s events", event_name) 

93 if diagnostics & OutcomeSelection.NO_DELIVERY: 

94 _LOGGER.info("SUPERNOTIFY archiving no delivery notifications as %s events", event_name) 

95 

96 if diagnostics & OutcomeSelection.ERROR: 

97 _LOGGER.info("SUPERNOTIFY archiving error notifications as %s events", event_name) 

98 

99 if diagnostics & OutcomeSelection.DUPE: 

100 _LOGGER.info("SUPERNOTIFY archiving dupe notifications as %s events", event_name) 

101 

102 async def archive(self, archive_object: ArchivableObject) -> bool: 

103 payload = archive_object.contents(diagnostics=archive_object.selected(self.diagnostics)) 

104 self.hass_api.fire_event(self.event_name, payload) 

105 return True 

106 

107 

108class ArchiveTopic(ArchiveDestination): 

109 def __init__( 

110 self, 

111 hass_api: HomeAssistantAPI, 

112 topic: str, 

113 qos: int = 0, 

114 retain: bool = True, 

115 diagnostics: OutcomeSelection = OutcomeSelection.ERROR, 

116 ) -> None: 

117 self.hass_api: HomeAssistantAPI = hass_api 

118 self.topic: str = topic 

119 self.qos: int = qos 

120 self.retain: bool = retain 

121 self.diagnostics: OutcomeSelection = diagnostics 

122 self.enabled: bool = False 

123 

124 async def initialize(self) -> None: 

125 if await self.hass_api.mqtt_available(raise_on_error=False): 

126 _LOGGER.info(f"SUPERNOTIFY Archiving to MQTT topic {self.topic}, qos {self.qos}, retain {self.retain}") 

127 self.enabled = True 

128 else: 

129 _LOGGER.warning( 

130 f"SUPERNOTIFY archiving configured for topic {self.topic} but MQTTT not available at startup, disabled" 

131 ) 

132 

133 async def archive(self, archive_object: ArchivableObject) -> bool: 

134 if not self.enabled: 

135 return False 

136 payload = archive_object.contents(diagnostics=archive_object.selected(self.diagnostics)) 

137 topic = f"{self.topic}/{archive_object.base_filename()}" 

138 _LOGGER.debug(f"SUPERNOTIFY Publishing notification to {topic}") 

139 try: 

140 await self.hass_api.mqtt_publish( 

141 topic=topic, 

142 payload=payload, 

143 qos=self.qos, 

144 retain=self.retain, 

145 ) 

146 return True 

147 except Exception: 

148 _LOGGER.warning(f"SUPERNOTIFY failed to archive to topic {self.topic}") 

149 return False 

150 

151 

152class ArchiveDirectory(ArchiveDestination): 

153 def __init__(self, path: str, purge_minute_interval: int, diagnostics: OutcomeSelection = OutcomeSelection.ERROR) -> None: 

154 self.configured_path: str = path 

155 self.archive_path: anyio.Path | None = None 

156 self.enabled: bool = False 

157 self.diagnostics: OutcomeSelection = diagnostics 

158 self.last_purge: dt.datetime | None = None 

159 self.purge_minute_interval: int = purge_minute_interval 

160 

161 async def initialize(self) -> None: 

162 verify_archive_path: anyio.Path = anyio.Path(self.configured_path) 

163 if verify_archive_path and not await verify_archive_path.exists(): 

164 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path) 

165 try: 

166 await verify_archive_path.mkdir(parents=True, exist_ok=True) 

167 except Exception as e: 

168 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e) 

169 if verify_archive_path and await verify_archive_path.exists() and await verify_archive_path.is_dir(): 

170 try: 

171 await verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True) 

172 self.archive_path = verify_archive_path 

173 _LOGGER.info("SUPERNOTIFY archiving notifications to file system at %s", verify_archive_path) 

174 self.enabled = True 

175 except Exception as e: 

176 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e) 

177 else: 

178 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path) 

179 

180 async def archive(self, archive_object: ArchivableObject) -> bool: 

181 archived: bool = False 

182 

183 if self.enabled and self.archive_path: # archive_path to assuage mypy 

184 archive_filepath: anyio.Path | None = None 

185 diagnostics: bool = archive_object.selected(self.diagnostics) 

186 try: 

187 filename = f"{archive_object.base_filename()}.json" 

188 archive_filepath = self.archive_path.joinpath(filename) 

189 serialized: str = json.dumps(archive_object.contents(diagnostics=diagnostics), indent=2) 

190 async with aiofiles.open(archive_filepath, mode="w") as file: 

191 await file.write(serialized) 

192 _LOGGER.debug("SUPERNOTIFY Archived notification %s", await archive_filepath.absolute()) 

193 archived = True 

194 except Exception as e: 

195 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e) 

196 if diagnostics and archive_filepath: 

197 try: 

198 serialized = json.dumps(archive_object.contents(diagnostics=False), indent=2) 

199 async with aiofiles.open(archive_filepath, mode="w") as file: 

200 await file.write(serialized) 

201 _LOGGER.warning("SUPERNOTIFY Archived minimal notification %s", await archive_filepath.absolute()) 

202 archived = True 

203 except Exception as e2: 

204 _LOGGER.exception("SUPERNOTIFY Unable to archive minimal notification: %s", e2) 

205 return archived 

206 

207 async def size(self) -> int: 

208 path = self.archive_path 

209 if path and await path.exists(): 

210 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST) 

211 return 0 

212 

213 async def cleanup(self, days: int, force: bool) -> int: 

214 if ( 

215 not force 

216 and self.last_purge is not None 

217 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval) 

218 ): 

219 return 0 

220 

221 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days) 

222 cutoff = cutoff.astimezone(dt.UTC) 

223 purged = 0 

224 if self.archive_path and await self.archive_path.exists(): 

225 try: 

226 archive = await aiofiles.os.scandir(self.archive_path) 

227 for entry in archive: 

228 if entry.name == ".startup": 

229 continue 

230 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff: 

231 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path) 

232 await aiofiles.os.unlink(Path(entry.path)) 

233 purged += 1 

234 except Exception as e: 

235 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True) 

236 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff) 

237 self.last_purge = dt.datetime.now(dt.UTC) 

238 else: 

239 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path) 

240 return purged 

241 

242 

243class NotificationArchive: 

244 def __init__( 

245 self, 

246 config: ConfigType, 

247 hass_api: HomeAssistantAPI, 

248 ) -> None: 

249 self.hass_api = hass_api 

250 self.enabled = bool(config.get(CONF_ENABLED, False)) 

251 self.archive_directory: ArchiveDirectory | None = None 

252 self.archive_topic: ArchiveTopic | None = None 

253 self.event_archiver: EventArchiver | None = None 

254 self.event_selection: OutcomeSelection = config.get(CONF_ARCHIVE_EVENT_SELECTION, OutcomeSelection.NONE) 

255 self.diagnostics: OutcomeSelection = config.get(CONF_ARCHIVE_DIAGNOSTICS, OutcomeSelection.ERROR) 

256 self.archive_event_name: str = config.get(CONF_ARCHIVE_EVENT_NAME, "supernotification") 

257 self.configured_archive_path: str | None = config.get(CONF_ARCHIVE_PATH) 

258 self.archive_days = int(config.get(CONF_ARCHIVE_DAYS, ARCHIVE_DEFAULT_DAYS)) 

259 self.mqtt_topic: str | None = config.get(CONF_ARCHIVE_MQTT_TOPIC) 

260 self.mqtt_qos: int = int(config.get(CONF_ARCHIVE_MQTT_QOS, 0)) 

261 self.mqtt_retain: bool = bool(config.get(CONF_ARCHIVE_MQTT_RETAIN, True)) 

262 self.debug: bool = bool(config.get(CONF_DEBUG, False)) 

263 

264 self.purge_minute_interval = int(config.get(CONF_ARCHIVE_PURGE_INTERVAL, ARCHIVE_PURGE_MIN_INTERVAL)) 

265 

266 async def initialize(self) -> None: 

267 if not self.enabled: 

268 _LOGGER.info("SUPERNOTIFY Archive disabled") 

269 return 

270 if not self.configured_archive_path: 

271 _LOGGER.warning("SUPERNOTIFY archive path not configured") 

272 else: 

273 self.archive_directory = ArchiveDirectory( 

274 self.configured_archive_path, purge_minute_interval=self.purge_minute_interval, diagnostics=self.diagnostics 

275 ) 

276 await self.archive_directory.initialize() 

277 

278 if self.mqtt_topic is not None: 

279 self.archive_topic = ArchiveTopic(self.hass_api, self.mqtt_topic, self.mqtt_qos, self.mqtt_retain, self.diagnostics) 

280 await self.archive_topic.initialize() 

281 

282 self.event_archiver = EventArchiver(self.hass_api, self.archive_event_name, self.diagnostics) 

283 

284 async def size(self) -> int: 

285 return await self.archive_directory.size() if self.archive_directory else 0 

286 

287 async def cleanup(self, days: int | None = None, force: bool = False) -> int: 

288 days = days or self.archive_days 

289 return await self.archive_directory.cleanup(days, force) if self.archive_directory else 0 

290 

291 async def archive(self, archive_object: ArchivableObject) -> bool: 

292 archived: bool = False 

293 if self.archive_topic: 

294 if await self.archive_topic.archive(archive_object): 

295 archived = True 

296 if self.archive_directory: 

297 if await self.archive_directory.archive(archive_object): 

298 archived = True 

299 if self.event_archiver and archive_object.selected(self.event_selection): 

300 await self.event_archiver.archive(archive_object) 

301 

302 return archived