Coverage for custom_components/supernotify/archive.py: 24%

158 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-06 15:56 +0000

1import datetime as dt 

2import logging 

3from abc import abstractmethod 

4from pathlib import Path 

5from typing import Any 

6 

7import aiofiles.os 

8import anyio 

9import homeassistant.util.dt as dt_util 

10from homeassistant.const import ( 

11 CONF_DEBUG, 

12 CONF_ENABLED, 

13) 

14from homeassistant.helpers import condition as condition 

15from homeassistant.helpers.json import save_json 

16from homeassistant.helpers.typing import ConfigType 

17 

18from custom_components.supernotify.hass_api import HomeAssistantAPI 

19 

20from .const import ( 

21 CONF_ARCHIVE_DAYS, 

22 CONF_ARCHIVE_MQTT_QOS, 

23 CONF_ARCHIVE_MQTT_RETAIN, 

24 CONF_ARCHIVE_MQTT_TOPIC, 

25 CONF_ARCHIVE_PATH, 

26 CONF_ARCHIVE_PURGE_INTERVAL, 

27) 

28 

29_LOGGER = logging.getLogger(__name__) 

30 

31ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60 

32ARCHIVE_DEFAULT_DAYS = 1 

33WRITE_TEST = ".startup" 

34 

35 

36class ArchivableObject: 

37 @abstractmethod 

38 def base_filename(self) -> str: 

39 pass 

40 

41 def contents(self, minimal: bool = False, **_kwargs: Any) -> Any: 

42 pass 

43 

44 

45class ArchiveTopic: 

46 def __init__(self, hass_api: HomeAssistantAPI, topic: str, qos: int = 0, retain: bool = True, debug: bool = False) -> None: 

47 self.hass_api: HomeAssistantAPI = hass_api 

48 self.topic: str = topic 

49 self.qos: int = qos 

50 self.retain: bool = retain 

51 self.debug: bool = debug 

52 self.enabled: bool = False 

53 

54 async def initialize(self) -> None: 

55 if await self.hass_api.mqtt_available(raise_on_error=False): 

56 _LOGGER.info(f"SUPERNOTIFY Archiving to MQTT topic {self.topic}, qos {self.qos}, retain {self.retain}") 

57 self.enabled = True 

58 

59 async def archive(self, archive_object: ArchivableObject) -> bool: 

60 if not self.enabled: 

61 return False 

62 payload = archive_object.contents(minimal=not self.debug) 

63 topic = f"{self.topic}/{archive_object.base_filename()}" 

64 _LOGGER.debug(f"SUPERNOTIFY Publishing notification to {topic}") 

65 try: 

66 await self.hass_api.mqtt_publish( 

67 topic=topic, 

68 payload=payload, 

69 qos=self.qos, 

70 retain=self.retain, 

71 ) 

72 return True 

73 except Exception: 

74 _LOGGER.warning(f"SUPERNOTIFY failed to archive to topic {self.topic}") 

75 return False 

76 

77 

78class ArchiveDirectory: 

79 def __init__(self, path: str, purge_minute_interval: int, debug: bool) -> None: 

80 self.configured_path: str = path 

81 self.archive_path: anyio.Path | None = None 

82 self.enabled: bool = False 

83 self.debug: bool = debug 

84 self.last_purge: dt.datetime | None = None 

85 self.purge_minute_interval: int = purge_minute_interval 

86 

87 async def initialize(self) -> None: 

88 verify_archive_path: anyio.Path = anyio.Path(self.configured_path) 

89 if verify_archive_path and not await verify_archive_path.exists(): 

90 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path) 

91 try: 

92 await verify_archive_path.mkdir(parents=True, exist_ok=True) 

93 except Exception as e: 

94 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e) 

95 if verify_archive_path and await verify_archive_path.exists() and await verify_archive_path.is_dir(): 

96 try: 

97 await verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True) 

98 self.archive_path = verify_archive_path 

99 _LOGGER.info("SUPERNOTIFY archiving notifications to file system at %s", verify_archive_path) 

100 self.enabled = True 

101 except Exception as e: 

102 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e) 

103 else: 

104 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path) 

105 

106 async def archive(self, archive_object: ArchivableObject) -> bool: 

107 archived: bool = False 

108 

109 if self.enabled and self.archive_path: # archive_path to assuage mypy 

110 archive_path: str = "" 

111 try: 

112 filename = f"{archive_object.base_filename()}.json" 

113 archive_path = str(self.archive_path.joinpath(filename)) 

114 save_json(archive_path, archive_object.contents(minimal=not self.debug)) 

115 _LOGGER.debug("SUPERNOTIFY Archived notification %s", archive_path) 

116 archived = True 

117 except Exception as e: 

118 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e) 

119 if self.debug: 

120 try: 

121 save_json(archive_path, archive_object.contents(minimal=not self.debug)) 

122 _LOGGER.warning("SUPERNOTIFY Archived minimal notification %s", archive_path) 

123 archived = True 

124 except Exception as e2: 

125 _LOGGER.exception("SUPERNOTIFY Unable to archive minimal notification: %s", e2) 

126 return archived 

127 

128 async def size(self) -> int: 

129 path = self.archive_path 

130 if path and await path.exists(): 

131 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST) 

132 return 0 

133 

134 async def cleanup(self, days: int, force: bool) -> int: 

135 if ( 

136 not force 

137 and self.last_purge is not None 

138 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval) 

139 ): 

140 return 0 

141 

142 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days) 

143 cutoff = cutoff.astimezone(dt.UTC) 

144 purged = 0 

145 if self.archive_path and await self.archive_path.exists(): 

146 try: 

147 archive = await aiofiles.os.scandir(self.archive_path) 

148 for entry in archive: 

149 if entry.name == ".startup": 

150 continue 

151 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff: 

152 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path) 

153 await aiofiles.os.unlink(Path(entry.path)) 

154 purged += 1 

155 except Exception as e: 

156 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True) 

157 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff) 

158 self.last_purge = dt.datetime.now(dt.UTC) 

159 else: 

160 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path) 

161 return purged 

162 

163 

164class NotificationArchive: 

165 def __init__( 

166 self, 

167 config: ConfigType, 

168 hass_api: HomeAssistantAPI, 

169 ) -> None: 

170 self.hass_api = hass_api 

171 self.enabled = bool(config.get(CONF_ENABLED, False)) 

172 self.archive_directory: ArchiveDirectory | None = None 

173 self.archive_topic: ArchiveTopic | None = None 

174 self.configured_archive_path: str | None = config.get(CONF_ARCHIVE_PATH) 

175 self.archive_days = int(config.get(CONF_ARCHIVE_DAYS, ARCHIVE_DEFAULT_DAYS)) 

176 self.mqtt_topic: str | None = config.get(CONF_ARCHIVE_MQTT_TOPIC) 

177 self.mqtt_qos: int = int(config.get(CONF_ARCHIVE_MQTT_QOS, 0)) 

178 self.mqtt_retain: bool = bool(config.get(CONF_ARCHIVE_MQTT_RETAIN, True)) 

179 self.debug: bool = bool(config.get(CONF_DEBUG, False)) 

180 

181 self.purge_minute_interval = int(config.get(CONF_ARCHIVE_PURGE_INTERVAL, ARCHIVE_PURGE_MIN_INTERVAL)) 

182 

183 async def initialize(self) -> None: 

184 if not self.enabled: 

185 _LOGGER.info("SUPERNOTIFY Archive disabled") 

186 return 

187 if not self.configured_archive_path: 

188 _LOGGER.warning("SUPERNOTIFY archive path not configured") 

189 else: 

190 self.archive_directory = ArchiveDirectory( 

191 self.configured_archive_path, purge_minute_interval=self.purge_minute_interval, debug=self.debug 

192 ) 

193 await self.archive_directory.initialize() 

194 

195 if self.mqtt_topic is not None: 

196 self.archive_topic = ArchiveTopic(self.hass_api, self.mqtt_topic, self.mqtt_qos, self.mqtt_retain, self.debug) 

197 await self.archive_topic.initialize() 

198 

199 async def size(self) -> int: 

200 return await self.archive_directory.size() if self.archive_directory else 0 

201 

202 async def cleanup(self, days: int | None = None, force: bool = False) -> int: 

203 days = days or self.archive_days 

204 return await self.archive_directory.cleanup(days, force) if self.archive_directory else 0 

205 

206 async def archive(self, archive_object: ArchivableObject) -> bool: 

207 archived: bool = False 

208 if self.archive_topic: 

209 if await self.archive_topic.archive(archive_object): 

210 archived = True 

211 if self.archive_directory: 

212 if await self.archive_directory.archive(archive_object): 

213 archived = True 

214 

215 return archived