Coverage for custom_components/supernotify/archive.py: 24%
158 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-06 15:56 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-06 15:56 +0000
1import datetime as dt
2import logging
3from abc import abstractmethod
4from pathlib import Path
5from typing import Any
7import aiofiles.os
8import anyio
9import homeassistant.util.dt as dt_util
10from homeassistant.const import (
11 CONF_DEBUG,
12 CONF_ENABLED,
13)
14from homeassistant.helpers import condition as condition
15from homeassistant.helpers.json import save_json
16from homeassistant.helpers.typing import ConfigType
18from custom_components.supernotify.hass_api import HomeAssistantAPI
20from .const import (
21 CONF_ARCHIVE_DAYS,
22 CONF_ARCHIVE_MQTT_QOS,
23 CONF_ARCHIVE_MQTT_RETAIN,
24 CONF_ARCHIVE_MQTT_TOPIC,
25 CONF_ARCHIVE_PATH,
26 CONF_ARCHIVE_PURGE_INTERVAL,
27)
29_LOGGER = logging.getLogger(__name__)
31ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60
32ARCHIVE_DEFAULT_DAYS = 1
33WRITE_TEST = ".startup"
36class ArchivableObject:
37 @abstractmethod
38 def base_filename(self) -> str:
39 pass
41 def contents(self, minimal: bool = False, **_kwargs: Any) -> Any:
42 pass
45class ArchiveTopic:
46 def __init__(self, hass_api: HomeAssistantAPI, topic: str, qos: int = 0, retain: bool = True, debug: bool = False) -> None:
47 self.hass_api: HomeAssistantAPI = hass_api
48 self.topic: str = topic
49 self.qos: int = qos
50 self.retain: bool = retain
51 self.debug: bool = debug
52 self.enabled: bool = False
54 async def initialize(self) -> None:
55 if await self.hass_api.mqtt_available(raise_on_error=False):
56 _LOGGER.info(f"SUPERNOTIFY Archiving to MQTT topic {self.topic}, qos {self.qos}, retain {self.retain}")
57 self.enabled = True
59 async def archive(self, archive_object: ArchivableObject) -> bool:
60 if not self.enabled:
61 return False
62 payload = archive_object.contents(minimal=not self.debug)
63 topic = f"{self.topic}/{archive_object.base_filename()}"
64 _LOGGER.debug(f"SUPERNOTIFY Publishing notification to {topic}")
65 try:
66 await self.hass_api.mqtt_publish(
67 topic=topic,
68 payload=payload,
69 qos=self.qos,
70 retain=self.retain,
71 )
72 return True
73 except Exception:
74 _LOGGER.warning(f"SUPERNOTIFY failed to archive to topic {self.topic}")
75 return False
78class ArchiveDirectory:
79 def __init__(self, path: str, purge_minute_interval: int, debug: bool) -> None:
80 self.configured_path: str = path
81 self.archive_path: anyio.Path | None = None
82 self.enabled: bool = False
83 self.debug: bool = debug
84 self.last_purge: dt.datetime | None = None
85 self.purge_minute_interval: int = purge_minute_interval
87 async def initialize(self) -> None:
88 verify_archive_path: anyio.Path = anyio.Path(self.configured_path)
89 if verify_archive_path and not await verify_archive_path.exists():
90 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path)
91 try:
92 await verify_archive_path.mkdir(parents=True, exist_ok=True)
93 except Exception as e:
94 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e)
95 if verify_archive_path and await verify_archive_path.exists() and await verify_archive_path.is_dir():
96 try:
97 await verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True)
98 self.archive_path = verify_archive_path
99 _LOGGER.info("SUPERNOTIFY archiving notifications to file system at %s", verify_archive_path)
100 self.enabled = True
101 except Exception as e:
102 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e)
103 else:
104 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path)
106 async def archive(self, archive_object: ArchivableObject) -> bool:
107 archived: bool = False
109 if self.enabled and self.archive_path: # archive_path to assuage mypy
110 archive_path: str = ""
111 try:
112 filename = f"{archive_object.base_filename()}.json"
113 archive_path = str(self.archive_path.joinpath(filename))
114 save_json(archive_path, archive_object.contents(minimal=not self.debug))
115 _LOGGER.debug("SUPERNOTIFY Archived notification %s", archive_path)
116 archived = True
117 except Exception as e:
118 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e)
119 if self.debug:
120 try:
121 save_json(archive_path, archive_object.contents(minimal=not self.debug))
122 _LOGGER.warning("SUPERNOTIFY Archived minimal notification %s", archive_path)
123 archived = True
124 except Exception as e2:
125 _LOGGER.exception("SUPERNOTIFY Unable to archive minimal notification: %s", e2)
126 return archived
128 async def size(self) -> int:
129 path = self.archive_path
130 if path and await path.exists():
131 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST)
132 return 0
134 async def cleanup(self, days: int, force: bool) -> int:
135 if (
136 not force
137 and self.last_purge is not None
138 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval)
139 ):
140 return 0
142 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days)
143 cutoff = cutoff.astimezone(dt.UTC)
144 purged = 0
145 if self.archive_path and await self.archive_path.exists():
146 try:
147 archive = await aiofiles.os.scandir(self.archive_path)
148 for entry in archive:
149 if entry.name == ".startup":
150 continue
151 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff:
152 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path)
153 await aiofiles.os.unlink(Path(entry.path))
154 purged += 1
155 except Exception as e:
156 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True)
157 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff)
158 self.last_purge = dt.datetime.now(dt.UTC)
159 else:
160 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path)
161 return purged
164class NotificationArchive:
165 def __init__(
166 self,
167 config: ConfigType,
168 hass_api: HomeAssistantAPI,
169 ) -> None:
170 self.hass_api = hass_api
171 self.enabled = bool(config.get(CONF_ENABLED, False))
172 self.archive_directory: ArchiveDirectory | None = None
173 self.archive_topic: ArchiveTopic | None = None
174 self.configured_archive_path: str | None = config.get(CONF_ARCHIVE_PATH)
175 self.archive_days = int(config.get(CONF_ARCHIVE_DAYS, ARCHIVE_DEFAULT_DAYS))
176 self.mqtt_topic: str | None = config.get(CONF_ARCHIVE_MQTT_TOPIC)
177 self.mqtt_qos: int = int(config.get(CONF_ARCHIVE_MQTT_QOS, 0))
178 self.mqtt_retain: bool = bool(config.get(CONF_ARCHIVE_MQTT_RETAIN, True))
179 self.debug: bool = bool(config.get(CONF_DEBUG, False))
181 self.purge_minute_interval = int(config.get(CONF_ARCHIVE_PURGE_INTERVAL, ARCHIVE_PURGE_MIN_INTERVAL))
183 async def initialize(self) -> None:
184 if not self.enabled:
185 _LOGGER.info("SUPERNOTIFY Archive disabled")
186 return
187 if not self.configured_archive_path:
188 _LOGGER.warning("SUPERNOTIFY archive path not configured")
189 else:
190 self.archive_directory = ArchiveDirectory(
191 self.configured_archive_path, purge_minute_interval=self.purge_minute_interval, debug=self.debug
192 )
193 await self.archive_directory.initialize()
195 if self.mqtt_topic is not None:
196 self.archive_topic = ArchiveTopic(self.hass_api, self.mqtt_topic, self.mqtt_qos, self.mqtt_retain, self.debug)
197 await self.archive_topic.initialize()
199 async def size(self) -> int:
200 return await self.archive_directory.size() if self.archive_directory else 0
202 async def cleanup(self, days: int | None = None, force: bool = False) -> int:
203 days = days or self.archive_days
204 return await self.archive_directory.cleanup(days, force) if self.archive_directory else 0
206 async def archive(self, archive_object: ArchivableObject) -> bool:
207 archived: bool = False
208 if self.archive_topic:
209 if await self.archive_topic.archive(archive_object):
210 archived = True
211 if self.archive_directory:
212 if await self.archive_directory.archive(archive_object):
213 archived = True
215 return archived