Coverage for custom_components/supernotify/archive.py: 88%
206 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1import datetime as dt
2import json
3import logging
4from abc import abstractmethod
5from pathlib import Path
6from typing import TYPE_CHECKING, Any
8import aiofiles.os
9import anyio
10import homeassistant.util.dt as dt_util
11from homeassistant.const import (
12 CONF_DEBUG,
13 CONF_ENABLED,
14)
15from homeassistant.helpers import condition as condition
17from .const import (
18 CONF_ARCHIVE_DAYS,
19 CONF_ARCHIVE_DIAGNOSTICS,
20 CONF_ARCHIVE_EVENT_NAME,
21 CONF_ARCHIVE_EVENT_SELECTION,
22 CONF_ARCHIVE_MQTT_QOS,
23 CONF_ARCHIVE_MQTT_RETAIN,
24 CONF_ARCHIVE_MQTT_TOPIC,
25 CONF_ARCHIVE_PATH,
26 CONF_ARCHIVE_PURGE_INTERVAL,
27)
28from .schema import Outcome, OutcomeSelection
30if TYPE_CHECKING:
31 from homeassistant.helpers.typing import ConfigType
33 from custom_components.supernotify.hass_api import HomeAssistantAPI
35_LOGGER = logging.getLogger(__name__)
37ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60
38ARCHIVE_DEFAULT_DAYS = 1
39WRITE_TEST = ".startup"
42class ArchivableObject:
43 @abstractmethod
44 def base_filename(self) -> str:
45 pass
47 @abstractmethod
48 def contents(self, diagnostics: bool = False, **_kwargs: Any) -> Any:
49 pass
51 def outcome(self) -> Outcome:
52 return Outcome.NO_DELIVERY
54 def selected(self, outcome_policy: OutcomeSelection) -> bool:
55 if outcome_policy & OutcomeSelection.NONE:
56 return False
57 return bool(
58 outcome_policy & OutcomeSelection.ALL
59 or (outcome_policy & OutcomeSelection.SUCCESS and self.outcome() == Outcome.SUCCESS)
60 or (outcome_policy & OutcomeSelection.NO_DELIVERY and self.outcome() == Outcome.NO_DELIVERY)
61 or (outcome_policy & OutcomeSelection.PARTIAL_DELIVERY and self.outcome() == Outcome.PARTIAL_DELIVERY)
62 or (outcome_policy & OutcomeSelection.DUPE and self.outcome() == Outcome.DUPE)
63 or (outcome_policy & OutcomeSelection.FALLBACK_DELIVERY and self.outcome() == Outcome.FALLBACK_DELIVERY)
64 or (outcome_policy & OutcomeSelection.ERROR and self.outcome() == Outcome.ERROR)
65 )
68class ArchiveDestination:
69 @abstractmethod
70 async def archive(self, archive_object: ArchivableObject) -> bool:
71 pass
74class EventArchiver(ArchiveDestination):
75 def __init__(
76 self, hass_api: HomeAssistantAPI, event_name: str, diagnostics: OutcomeSelection = OutcomeSelection.ERROR
77 ) -> None:
78 self.hass_api = hass_api
79 self.event_name = event_name
80 self.diagnostics = diagnostics
81 if diagnostics & OutcomeSelection.NONE:
82 pass
83 elif diagnostics & OutcomeSelection.ALL:
84 _LOGGER.info("SUPERNOTIFY archiving all notifications as %s events", event_name)
85 else:
86 if diagnostics & OutcomeSelection.SUCCESS:
87 _LOGGER.info("SUPERNOTIFY archiving successful notifications as %s events", event_name)
88 if diagnostics & OutcomeSelection.PARTIAL_DELIVERY:
89 _LOGGER.info("SUPERNOTIFY archiving partial delivery notifications as %s events", event_name)
91 if diagnostics & OutcomeSelection.FALLBACK_DELIVERY:
92 _LOGGER.info("SUPERNOTIFY archiving fallback notifications as %s events", event_name)
93 if diagnostics & OutcomeSelection.NO_DELIVERY:
94 _LOGGER.info("SUPERNOTIFY archiving no delivery notifications as %s events", event_name)
96 if diagnostics & OutcomeSelection.ERROR:
97 _LOGGER.info("SUPERNOTIFY archiving error notifications as %s events", event_name)
99 if diagnostics & OutcomeSelection.DUPE:
100 _LOGGER.info("SUPERNOTIFY archiving dupe notifications as %s events", event_name)
102 async def archive(self, archive_object: ArchivableObject) -> bool:
103 payload = archive_object.contents(diagnostics=archive_object.selected(self.diagnostics))
104 self.hass_api.fire_event(self.event_name, payload)
105 return True
108class ArchiveTopic(ArchiveDestination):
109 def __init__(
110 self,
111 hass_api: HomeAssistantAPI,
112 topic: str,
113 qos: int = 0,
114 retain: bool = True,
115 diagnostics: OutcomeSelection = OutcomeSelection.ERROR,
116 ) -> None:
117 self.hass_api: HomeAssistantAPI = hass_api
118 self.topic: str = topic
119 self.qos: int = qos
120 self.retain: bool = retain
121 self.diagnostics: OutcomeSelection = diagnostics
122 self.enabled: bool = False
124 async def initialize(self) -> None:
125 if await self.hass_api.mqtt_available(raise_on_error=False):
126 _LOGGER.info(f"SUPERNOTIFY Archiving to MQTT topic {self.topic}, qos {self.qos}, retain {self.retain}")
127 self.enabled = True
128 else:
129 _LOGGER.warning(
130 f"SUPERNOTIFY archiving configured for topic {self.topic} but MQTTT not available at startup, disabled"
131 )
133 async def archive(self, archive_object: ArchivableObject) -> bool:
134 if not self.enabled:
135 return False
136 payload = archive_object.contents(diagnostics=archive_object.selected(self.diagnostics))
137 topic = f"{self.topic}/{archive_object.base_filename()}"
138 _LOGGER.debug(f"SUPERNOTIFY Publishing notification to {topic}")
139 try:
140 await self.hass_api.mqtt_publish(
141 topic=topic,
142 payload=payload,
143 qos=self.qos,
144 retain=self.retain,
145 )
146 return True
147 except Exception:
148 _LOGGER.warning(f"SUPERNOTIFY failed to archive to topic {self.topic}")
149 return False
152class ArchiveDirectory(ArchiveDestination):
153 def __init__(self, path: str, purge_minute_interval: int, diagnostics: OutcomeSelection = OutcomeSelection.ERROR) -> None:
154 self.configured_path: str = path
155 self.archive_path: anyio.Path | None = None
156 self.enabled: bool = False
157 self.diagnostics: OutcomeSelection = diagnostics
158 self.last_purge: dt.datetime | None = None
159 self.purge_minute_interval: int = purge_minute_interval
161 async def initialize(self) -> None:
162 verify_archive_path: anyio.Path = anyio.Path(self.configured_path)
163 if verify_archive_path and not await verify_archive_path.exists():
164 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path)
165 try:
166 await verify_archive_path.mkdir(parents=True, exist_ok=True)
167 except Exception as e:
168 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e)
169 if verify_archive_path and await verify_archive_path.exists() and await verify_archive_path.is_dir():
170 try:
171 await verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True)
172 self.archive_path = verify_archive_path
173 _LOGGER.info("SUPERNOTIFY archiving notifications to file system at %s", verify_archive_path)
174 self.enabled = True
175 except Exception as e:
176 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e)
177 else:
178 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path)
180 async def archive(self, archive_object: ArchivableObject) -> bool:
181 archived: bool = False
183 if self.enabled and self.archive_path: # archive_path to assuage mypy
184 archive_filepath: anyio.Path | None = None
185 diagnostics: bool = archive_object.selected(self.diagnostics)
186 try:
187 filename = f"{archive_object.base_filename()}.json"
188 archive_filepath = self.archive_path.joinpath(filename)
189 serialized: str = json.dumps(archive_object.contents(diagnostics=diagnostics), indent=2)
190 async with aiofiles.open(archive_filepath, mode="w") as file:
191 await file.write(serialized)
192 _LOGGER.debug("SUPERNOTIFY Archived notification %s", await archive_filepath.absolute())
193 archived = True
194 except Exception as e:
195 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e)
196 if diagnostics and archive_filepath:
197 try:
198 serialized = json.dumps(archive_object.contents(diagnostics=False), indent=2)
199 async with aiofiles.open(archive_filepath, mode="w") as file:
200 await file.write(serialized)
201 _LOGGER.warning("SUPERNOTIFY Archived minimal notification %s", await archive_filepath.absolute())
202 archived = True
203 except Exception as e2:
204 _LOGGER.exception("SUPERNOTIFY Unable to archive minimal notification: %s", e2)
205 return archived
207 async def size(self) -> int:
208 path = self.archive_path
209 if path and await path.exists():
210 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST)
211 return 0
213 async def cleanup(self, days: int, force: bool) -> int:
214 if (
215 not force
216 and self.last_purge is not None
217 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval)
218 ):
219 return 0
221 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days)
222 cutoff = cutoff.astimezone(dt.UTC)
223 purged = 0
224 if self.archive_path and await self.archive_path.exists():
225 try:
226 archive = await aiofiles.os.scandir(self.archive_path)
227 for entry in archive:
228 if entry.name == ".startup":
229 continue
230 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff:
231 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path)
232 await aiofiles.os.unlink(Path(entry.path))
233 purged += 1
234 except Exception as e:
235 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True)
236 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff)
237 self.last_purge = dt.datetime.now(dt.UTC)
238 else:
239 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path)
240 return purged
243class NotificationArchive:
244 def __init__(
245 self,
246 config: ConfigType,
247 hass_api: HomeAssistantAPI,
248 ) -> None:
249 self.hass_api = hass_api
250 self.enabled = bool(config.get(CONF_ENABLED, False))
251 self.archive_directory: ArchiveDirectory | None = None
252 self.archive_topic: ArchiveTopic | None = None
253 self.event_archiver: EventArchiver | None = None
254 self.event_selection: OutcomeSelection = config.get(CONF_ARCHIVE_EVENT_SELECTION, OutcomeSelection.NONE)
255 self.diagnostics: OutcomeSelection = config.get(CONF_ARCHIVE_DIAGNOSTICS, OutcomeSelection.ERROR)
256 self.archive_event_name: str = config.get(CONF_ARCHIVE_EVENT_NAME, "supernotification")
257 self.configured_archive_path: str | None = config.get(CONF_ARCHIVE_PATH)
258 self.archive_days = int(config.get(CONF_ARCHIVE_DAYS, ARCHIVE_DEFAULT_DAYS))
259 self.mqtt_topic: str | None = config.get(CONF_ARCHIVE_MQTT_TOPIC)
260 self.mqtt_qos: int = int(config.get(CONF_ARCHIVE_MQTT_QOS, 0))
261 self.mqtt_retain: bool = bool(config.get(CONF_ARCHIVE_MQTT_RETAIN, True))
262 self.debug: bool = bool(config.get(CONF_DEBUG, False))
264 self.purge_minute_interval = int(config.get(CONF_ARCHIVE_PURGE_INTERVAL, ARCHIVE_PURGE_MIN_INTERVAL))
266 async def initialize(self) -> None:
267 if not self.enabled:
268 _LOGGER.info("SUPERNOTIFY Archive disabled")
269 return
270 if not self.configured_archive_path:
271 _LOGGER.warning("SUPERNOTIFY archive path not configured")
272 else:
273 self.archive_directory = ArchiveDirectory(
274 self.configured_archive_path, purge_minute_interval=self.purge_minute_interval, diagnostics=self.diagnostics
275 )
276 await self.archive_directory.initialize()
278 if self.mqtt_topic is not None:
279 self.archive_topic = ArchiveTopic(self.hass_api, self.mqtt_topic, self.mqtt_qos, self.mqtt_retain, self.diagnostics)
280 await self.archive_topic.initialize()
282 self.event_archiver = EventArchiver(self.hass_api, self.archive_event_name, self.diagnostics)
284 async def size(self) -> int:
285 return await self.archive_directory.size() if self.archive_directory else 0
287 async def cleanup(self, days: int | None = None, force: bool = False) -> int:
288 days = days or self.archive_days
289 return await self.archive_directory.cleanup(days, force) if self.archive_directory else 0
291 async def archive(self, archive_object: ArchivableObject) -> bool:
292 archived: bool = False
293 if self.archive_topic:
294 if await self.archive_topic.archive(archive_object):
295 archived = True
296 if self.archive_directory:
297 if await self.archive_directory.archive(archive_object):
298 archived = True
299 if self.event_archiver and archive_object.selected(self.event_selection):
300 await self.event_archiver.archive(archive_object)
302 return archived