Coverage for custom_components/supernotify/archive.py: 87%
158 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
1import datetime as dt
2import logging
3from abc import abstractmethod
4from pathlib import Path
5from typing import Any
7import aiofiles.os
8import anyio
9import homeassistant.util.dt as dt_util
10from homeassistant.const import CONF_ENABLED
11from homeassistant.helpers import condition as condition
12from homeassistant.helpers.json import save_json
13from homeassistant.helpers.typing import ConfigType
15from custom_components.supernotify.hass_api import HomeAssistantAPI
17from . import (
18 CONF_ARCHIVE_DAYS,
19 CONF_ARCHIVE_MQTT_QOS,
20 CONF_ARCHIVE_MQTT_RETAIN,
21 CONF_ARCHIVE_MQTT_TOPIC,
22 CONF_ARCHIVE_PATH,
23 CONF_ARCHIVE_PURGE_INTERVAL,
24 CONF_DEBUG,
25)
27_LOGGER = logging.getLogger(__name__)
29ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60
30ARCHIVE_DEFAULT_DAYS = 1
31WRITE_TEST = ".startup"
34class ArchivableObject:
35 @abstractmethod
36 def base_filename(self) -> str:
37 pass
39 def contents(self, minimal: bool = False) -> Any:
40 pass
43class ArchiveTopic:
44 def __init__(self, hass_api: HomeAssistantAPI, topic: str, qos: int = 0, retain: bool = True, debug: bool = False) -> None:
45 self.hass_api: HomeAssistantAPI = hass_api
46 self.topic: str = topic
47 self.qos: int = qos
48 self.retain: bool = retain
49 self.debug: bool = debug
50 self.enabled: bool = False
52 async def initialize(self) -> None:
53 if await self.hass_api.mqtt_available(raise_on_error=False):
54 _LOGGER.info(f"SUPERNOTIFY Archiving to MQTT topic {self.topic}, qos {self.qos}, retain {self.retain}")
55 self.enabled = True
57 async def archive(self, archive_object: ArchivableObject) -> bool:
58 if not self.enabled:
59 return False
60 payload = archive_object.contents(minimal=self.debug)
61 topic = f"{self.topic}/{archive_object.base_filename()}"
62 _LOGGER.debug(f"SUPERNOTIFY Publishing notification to {topic}")
63 try:
64 await self.hass_api.mqtt_publish(
65 topic=topic,
66 payload=payload,
67 qos=self.qos,
68 retain=self.retain,
69 )
70 return True
71 except Exception:
72 _LOGGER.warning(f"SUPERNOTIFY failed to archive to topic {self.topic}")
73 return False
76class ArchiveDirectory:
77 def __init__(self, path: str, purge_minute_interval: int, debug: bool) -> None:
78 self.configured_path: str = path
79 self.archive_path: anyio.Path | None = None
80 self.enabled: bool = False
81 self.debug: bool = debug
82 self.last_purge: dt.datetime | None = None
83 self.purge_minute_interval: int = purge_minute_interval
85 async def initialize(self) -> None:
86 verify_archive_path: anyio.Path = anyio.Path(self.configured_path)
87 if verify_archive_path and not await verify_archive_path.exists():
88 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path)
89 try:
90 await verify_archive_path.mkdir(parents=True, exist_ok=True)
91 except Exception as e:
92 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e)
93 if verify_archive_path and await verify_archive_path.exists() and await verify_archive_path.is_dir():
94 try:
95 await verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True)
96 self.archive_path = verify_archive_path
97 _LOGGER.info("SUPERNOTIFY archiving notifications to file system at %s", verify_archive_path)
98 self.enabled = True
99 except Exception as e:
100 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e)
101 else:
102 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path)
104 async def archive(self, archive_object: ArchivableObject) -> bool:
105 archived: bool = False
107 if self.enabled and self.archive_path: # archive_path to assuage mypy
108 archive_path: str = ""
109 try:
110 filename = f"{archive_object.base_filename()}.json"
111 archive_path = str(self.archive_path.joinpath(filename))
112 save_json(archive_path, archive_object.contents(minimal=self.debug))
113 _LOGGER.debug("SUPERNOTIFY Archived notification %s", archive_path)
114 archived = True
115 except Exception as e:
116 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e)
117 if self.debug:
118 try:
119 save_json(archive_path, archive_object.contents(minimal=self.debug))
120 _LOGGER.warning("SUPERNOTIFY Archived minimal notification %s", archive_path)
121 archived = True
122 except Exception as e2:
123 _LOGGER.exception("SUPERNOTIFY Unable to archive minimal notification: %s", e2)
124 return archived
126 async def size(self) -> int:
127 path = self.archive_path
128 if path and await path.exists():
129 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST)
130 return 0
132 async def cleanup(self, days: int, force: bool) -> int:
133 if (
134 not force
135 and self.last_purge is not None
136 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval)
137 ):
138 return 0
140 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days)
141 cutoff = cutoff.astimezone(dt.UTC)
142 purged = 0
143 if self.archive_path and await self.archive_path.exists():
144 try:
145 archive = await aiofiles.os.scandir(self.archive_path)
146 for entry in archive:
147 if entry.name == ".startup":
148 continue
149 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff:
150 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path)
151 await aiofiles.os.unlink(Path(entry.path))
152 purged += 1
153 except Exception as e:
154 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True)
155 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff)
156 self.last_purge = dt.datetime.now(dt.UTC)
157 else:
158 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path)
159 return purged
162class NotificationArchive:
163 def __init__(
164 self,
165 config: ConfigType,
166 hass_api: HomeAssistantAPI,
167 ) -> None:
168 self.hass_api = hass_api
169 self.enabled = bool(config.get(CONF_ENABLED, False))
170 self.archive_directory: ArchiveDirectory | None = None
171 self.archive_topic: ArchiveTopic | None = None
172 self.configured_archive_path: str | None = config.get(CONF_ARCHIVE_PATH)
173 self.archive_days = int(config.get(CONF_ARCHIVE_DAYS, ARCHIVE_DEFAULT_DAYS))
174 self.mqtt_topic: str | None = config.get(CONF_ARCHIVE_MQTT_TOPIC)
175 self.mqtt_qos: int = int(config.get(CONF_ARCHIVE_MQTT_QOS, 0))
176 self.mqtt_retain: bool = bool(config.get(CONF_ARCHIVE_MQTT_RETAIN, True))
177 self.debug: bool = bool(config.get(CONF_DEBUG, False))
179 self.purge_minute_interval = int(config.get(CONF_ARCHIVE_PURGE_INTERVAL, ARCHIVE_PURGE_MIN_INTERVAL))
181 async def initialize(self) -> None:
182 if not self.enabled:
183 _LOGGER.info("SUPERNOTIFY Archive disabled")
184 return
185 if not self.configured_archive_path:
186 _LOGGER.warning("SUPERNOTIFY archive path not configured")
187 else:
188 self.archive_directory = ArchiveDirectory(
189 self.configured_archive_path, purge_minute_interval=self.purge_minute_interval, debug=self.debug
190 )
191 await self.archive_directory.initialize()
193 if self.mqtt_topic is not None:
194 self.archive_topic = ArchiveTopic(self.hass_api, self.mqtt_topic, self.mqtt_qos, self.mqtt_retain, self.debug)
195 await self.archive_topic.initialize()
197 async def size(self) -> int:
198 return await self.archive_directory.size() if self.archive_directory else 0
200 async def cleanup(self, days: int | None = None, force: bool = False) -> int:
201 days = days or self.archive_days
202 return await self.archive_directory.cleanup(days, force) if self.archive_directory else 0
204 async def archive(self, archive_object: ArchivableObject) -> bool:
205 archived: bool = False
206 if self.archive_topic:
207 if await self.archive_topic.archive(archive_object):
208 archived = True
209 if self.archive_directory:
210 if await self.archive_directory.archive(archive_object):
211 archived = True
213 return archived