Coverage for custom_components/supernotify/archive.py: 87%

158 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-21 23:31 +0000

1import datetime as dt 

2import logging 

3from abc import abstractmethod 

4from pathlib import Path 

5from typing import Any 

6 

7import aiofiles.os 

8import anyio 

9import homeassistant.util.dt as dt_util 

10from homeassistant.const import CONF_ENABLED 

11from homeassistant.helpers import condition as condition 

12from homeassistant.helpers.json import save_json 

13from homeassistant.helpers.typing import ConfigType 

14 

15from custom_components.supernotify.hass_api import HomeAssistantAPI 

16 

17from . import ( 

18 CONF_ARCHIVE_DAYS, 

19 CONF_ARCHIVE_MQTT_QOS, 

20 CONF_ARCHIVE_MQTT_RETAIN, 

21 CONF_ARCHIVE_MQTT_TOPIC, 

22 CONF_ARCHIVE_PATH, 

23 CONF_ARCHIVE_PURGE_INTERVAL, 

24 CONF_DEBUG, 

25) 

26 

27_LOGGER = logging.getLogger(__name__) 

28 

29ARCHIVE_PURGE_MIN_INTERVAL = 3 * 60 

30ARCHIVE_DEFAULT_DAYS = 1 

31WRITE_TEST = ".startup" 

32 

33 

34class ArchivableObject: 

35 @abstractmethod 

36 def base_filename(self) -> str: 

37 pass 

38 

39 def contents(self, minimal: bool = False) -> Any: 

40 pass 

41 

42 

43class ArchiveTopic: 

44 def __init__(self, hass_api: HomeAssistantAPI, topic: str, qos: int = 0, retain: bool = True, debug: bool = False) -> None: 

45 self.hass_api: HomeAssistantAPI = hass_api 

46 self.topic: str = topic 

47 self.qos: int = qos 

48 self.retain: bool = retain 

49 self.debug: bool = debug 

50 self.enabled: bool = False 

51 

52 async def initialize(self) -> None: 

53 if await self.hass_api.mqtt_available(raise_on_error=False): 

54 _LOGGER.info(f"SUPERNOTIFY Archiving to MQTT topic {self.topic}, qos {self.qos}, retain {self.retain}") 

55 self.enabled = True 

56 

57 async def archive(self, archive_object: ArchivableObject) -> bool: 

58 if not self.enabled: 

59 return False 

60 payload = archive_object.contents(minimal=self.debug) 

61 topic = f"{self.topic}/{archive_object.base_filename()}" 

62 _LOGGER.debug(f"SUPERNOTIFY Publishing notification to {topic}") 

63 try: 

64 await self.hass_api.mqtt_publish( 

65 topic=topic, 

66 payload=payload, 

67 qos=self.qos, 

68 retain=self.retain, 

69 ) 

70 return True 

71 except Exception: 

72 _LOGGER.warning(f"SUPERNOTIFY failed to archive to topic {self.topic}") 

73 return False 

74 

75 

76class ArchiveDirectory: 

77 def __init__(self, path: str, purge_minute_interval: int, debug: bool) -> None: 

78 self.configured_path: str = path 

79 self.archive_path: anyio.Path | None = None 

80 self.enabled: bool = False 

81 self.debug: bool = debug 

82 self.last_purge: dt.datetime | None = None 

83 self.purge_minute_interval: int = purge_minute_interval 

84 

85 async def initialize(self) -> None: 

86 verify_archive_path: anyio.Path = anyio.Path(self.configured_path) 

87 if verify_archive_path and not await verify_archive_path.exists(): 

88 _LOGGER.info("SUPERNOTIFY archive path not found at %s", verify_archive_path) 

89 try: 

90 await verify_archive_path.mkdir(parents=True, exist_ok=True) 

91 except Exception as e: 

92 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be created: %s", verify_archive_path, e) 

93 if verify_archive_path and await verify_archive_path.exists() and await verify_archive_path.is_dir(): 

94 try: 

95 await verify_archive_path.joinpath(WRITE_TEST).touch(exist_ok=True) 

96 self.archive_path = verify_archive_path 

97 _LOGGER.info("SUPERNOTIFY archiving notifications to file system at %s", verify_archive_path) 

98 self.enabled = True 

99 except Exception as e: 

100 _LOGGER.warning("SUPERNOTIFY archive path %s cannot be written: %s", verify_archive_path, e) 

101 else: 

102 _LOGGER.warning("SUPERNOTIFY archive path %s is not a directory or does not exist", verify_archive_path) 

103 

104 async def archive(self, archive_object: ArchivableObject) -> bool: 

105 archived: bool = False 

106 

107 if self.enabled and self.archive_path: # archive_path to assuage mypy 

108 archive_path: str = "" 

109 try: 

110 filename = f"{archive_object.base_filename()}.json" 

111 archive_path = str(self.archive_path.joinpath(filename)) 

112 save_json(archive_path, archive_object.contents(minimal=self.debug)) 

113 _LOGGER.debug("SUPERNOTIFY Archived notification %s", archive_path) 

114 archived = True 

115 except Exception as e: 

116 _LOGGER.warning("SUPERNOTIFY Unable to archive notification: %s", e) 

117 if self.debug: 

118 try: 

119 save_json(archive_path, archive_object.contents(minimal=self.debug)) 

120 _LOGGER.warning("SUPERNOTIFY Archived minimal notification %s", archive_path) 

121 archived = True 

122 except Exception as e2: 

123 _LOGGER.exception("SUPERNOTIFY Unable to archive minimal notification: %s", e2) 

124 return archived 

125 

126 async def size(self) -> int: 

127 path = self.archive_path 

128 if path and await path.exists(): 

129 return sum(1 for p in await aiofiles.os.listdir(path) if p != WRITE_TEST) 

130 return 0 

131 

132 async def cleanup(self, days: int, force: bool) -> int: 

133 if ( 

134 not force 

135 and self.last_purge is not None 

136 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval) 

137 ): 

138 return 0 

139 

140 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days) 

141 cutoff = cutoff.astimezone(dt.UTC) 

142 purged = 0 

143 if self.archive_path and await self.archive_path.exists(): 

144 try: 

145 archive = await aiofiles.os.scandir(self.archive_path) 

146 for entry in archive: 

147 if entry.name == ".startup": 

148 continue 

149 if dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff: 

150 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path) 

151 await aiofiles.os.unlink(Path(entry.path)) 

152 purged += 1 

153 except Exception as e: 

154 _LOGGER.warning("SUPERNOTIFY Unable to clean up archive at %s: %s", self.archive_path, e, exc_info=True) 

155 _LOGGER.info("SUPERNOTIFY Purged %s archived notifications for cutoff %s", purged, cutoff) 

156 self.last_purge = dt.datetime.now(dt.UTC) 

157 else: 

158 _LOGGER.debug("SUPERNOTIFY Skipping archive purge for unknown path %s", self.archive_path) 

159 return purged 

160 

161 

162class NotificationArchive: 

163 def __init__( 

164 self, 

165 config: ConfigType, 

166 hass_api: HomeAssistantAPI, 

167 ) -> None: 

168 self.hass_api = hass_api 

169 self.enabled = bool(config.get(CONF_ENABLED, False)) 

170 self.archive_directory: ArchiveDirectory | None = None 

171 self.archive_topic: ArchiveTopic | None = None 

172 self.configured_archive_path: str | None = config.get(CONF_ARCHIVE_PATH) 

173 self.archive_days = int(config.get(CONF_ARCHIVE_DAYS, ARCHIVE_DEFAULT_DAYS)) 

174 self.mqtt_topic: str | None = config.get(CONF_ARCHIVE_MQTT_TOPIC) 

175 self.mqtt_qos: int = int(config.get(CONF_ARCHIVE_MQTT_QOS, 0)) 

176 self.mqtt_retain: bool = bool(config.get(CONF_ARCHIVE_MQTT_RETAIN, True)) 

177 self.debug: bool = bool(config.get(CONF_DEBUG, False)) 

178 

179 self.purge_minute_interval = int(config.get(CONF_ARCHIVE_PURGE_INTERVAL, ARCHIVE_PURGE_MIN_INTERVAL)) 

180 

181 async def initialize(self) -> None: 

182 if not self.enabled: 

183 _LOGGER.info("SUPERNOTIFY Archive disabled") 

184 return 

185 if not self.configured_archive_path: 

186 _LOGGER.warning("SUPERNOTIFY archive path not configured") 

187 else: 

188 self.archive_directory = ArchiveDirectory( 

189 self.configured_archive_path, purge_minute_interval=self.purge_minute_interval, debug=self.debug 

190 ) 

191 await self.archive_directory.initialize() 

192 

193 if self.mqtt_topic is not None: 

194 self.archive_topic = ArchiveTopic(self.hass_api, self.mqtt_topic, self.mqtt_qos, self.mqtt_retain, self.debug) 

195 await self.archive_topic.initialize() 

196 

197 async def size(self) -> int: 

198 return await self.archive_directory.size() if self.archive_directory else 0 

199 

200 async def cleanup(self, days: int | None = None, force: bool = False) -> int: 

201 days = days or self.archive_days 

202 return await self.archive_directory.cleanup(days, force) if self.archive_directory else 0 

203 

204 async def archive(self, archive_object: ArchivableObject) -> bool: 

205 archived: bool = False 

206 if self.archive_topic: 

207 if await self.archive_topic.archive(archive_object): 

208 archived = True 

209 if self.archive_directory: 

210 if await self.archive_directory.archive(archive_object): 

211 archived = True 

212 

213 return archived