Coverage for custom_components/supernotify/transports/email.py: 97%

150 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-01 15:06 +0000

1import logging 

2import os 

3import os.path 

4from typing import TYPE_CHECKING, Any, TypedDict 

5 

6import aiofiles 

7from anyio import Path 

8from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET, ATTR_TITLE 

9from homeassistant.helpers.template import Template, TemplateError 

10 

11import custom_components.supernotify 

12from custom_components.supernotify.const import ( 

13 ATTR_ACTION_URL, 

14 ATTR_ACTION_URL_TITLE, 

15 ATTR_EMAIL, 

16 ATTR_MEDIA, 

17 ATTR_MEDIA_SNAPSHOT_URL, 

18 CONF_TEMPLATE, 

19 OPTION_JPEG, 

20 OPTION_MESSAGE_USAGE, 

21 OPTION_PNG, 

22 OPTION_SIMPLIFY_TEXT, 

23 OPTION_STRICT_TEMPLATE, 

24 OPTION_STRIP_URLS, 

25 OPTION_TARGET_CATEGORIES, 

26 TRANSPORT_EMAIL, 

27) 

28from custom_components.supernotify.model import DebugTrace, DeliveryConfig, MessageOnlyPolicy, TransportConfig, TransportFeature 

29from custom_components.supernotify.transport import Transport 

30 

31if TYPE_CHECKING: 

32 from homeassistant.helpers.typing import ConfigType 

33 

34 from custom_components.supernotify.context import Context 

35 from custom_components.supernotify.envelope import Envelope 

36 from custom_components.supernotify.hass_api import HomeAssistantAPI 

37 

38RE_VALID_EMAIL = ( 

39 r"^[a-zA-Z0-9.+/=?^_-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$" 

40) 

41OPTION_PREHEADER_BLANK = "preheader_blank" 

42OPTION_PREHEADER_LENGTH = "preheader_length" 

43 

44_LOGGER = logging.getLogger(__name__) 

45 

46 

47class AlertServer(TypedDict): 

48 name: str 

49 internal_url: str 

50 external_url: str 

51 language: str 

52 

53 

54class AlertImage(TypedDict): 

55 url: str 

56 desc: str 

57 

58 

59class Alert(TypedDict): 

60 message: str | None 

61 title: str | None 

62 preheader: str | None 

63 priority: str 

64 envelope: Envelope 

65 action_url: str | None 

66 action_url_title: str | None 

67 subheading: str 

68 server: AlertServer 

69 preformatted_html: str | None 

70 img: AlertImage | None 

71 

72 

73class EmailTransport(Transport): 

74 name = TRANSPORT_EMAIL 

75 

76 def __init__(self, context: Context, transport_config: ConfigType | None = None) -> None: 

77 super().__init__(context, transport_config) 

78 self.default_template_path: Path = Path(os.path.join(custom_components.supernotify.__path__[0], "default_templates")) # noqa: PTH118 

79 self.custom_template_path: Path | None = None 

80 self.custom_email_template_path: Path | None = None 

81 self.template_cache: dict[str, str] = {} 

82 

83 try: 

84 if context.custom_template_path is not None: 

85 self.custom_template_path = Path(context.custom_template_path) 

86 if (context.custom_template_path / "email").exists(): 

87 self.custom_email_template_path = Path(context.custom_template_path / "email") 

88 else: 

89 _LOGGER.debug("SUPERNOTIFY Email specific custom templates not configured") 

90 else: 

91 _LOGGER.info("SUPERNOTIFY Custom templates not configured") 

92 self.custom_template_path = None 

93 except Exception as e: 

94 _LOGGER.error("SUPERNOTIFY Failed to verify custom template path %s: %s", context.custom_template_path, e) 

95 

96 def validate_action(self, action: str | None) -> bool: 

97 """Override in subclass if transport has fixed action or doesn't require one""" 

98 return action is not None 

99 

100 def auto_configure(self, hass_api: HomeAssistantAPI) -> DeliveryConfig | None: 

101 action: str | None = hass_api.find_service("notify", "homeassistant.components.smtp.notify") 

102 if action: 

103 delivery_config: DeliveryConfig = self.delivery_defaults 

104 delivery_config.action = action 

105 return delivery_config 

106 return None 

107 

108 @property 

109 def supported_features(self) -> TransportFeature: 

110 return ( 

111 TransportFeature.MESSAGE 

112 | TransportFeature.TITLE 

113 | TransportFeature.ACTIONS 

114 | TransportFeature.IMAGES 

115 | TransportFeature.TEMPLATE_FILE 

116 | TransportFeature.SNAPSHOT_IMAGE 

117 ) 

118 

119 def extra_attributes(self) -> dict[str, Any]: 

120 return { 

121 "cached_templates": list(self.template_cache.keys()), 

122 "custom_templates": str(self.custom_template_path) if self.custom_template_path else None, 

123 "custom_email_templates": str(self.custom_email_template_path) if self.custom_email_template_path else None, 

124 } 

125 

126 @property 

127 def default_config(self) -> TransportConfig: 

128 config = TransportConfig() 

129 config.delivery_defaults.options = { 

130 OPTION_SIMPLIFY_TEXT: False, 

131 OPTION_STRIP_URLS: False, 

132 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD, 

133 OPTION_TARGET_CATEGORIES: [ATTR_EMAIL], 

134 # use sensible defaults for image attachments 

135 OPTION_JPEG: {"progressive": "true", "optimize": "true"}, 

136 OPTION_PNG: {"optimize": "true"}, 

137 OPTION_STRICT_TEMPLATE: False, 

138 OPTION_PREHEADER_BLANK: "͏‌ ", 

139 OPTION_PREHEADER_LENGTH: 100, 

140 } 

141 return config 

142 

143 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: 

144 _LOGGER.debug("SUPERNOTIFY notify_email: %s %s", envelope.delivery_name, envelope.target.email) 

145 

146 data: dict[str, Any] = envelope.data or {} 

147 html: str | None = data.get("html") 

148 template_name: str | None = data.get(CONF_TEMPLATE, envelope.delivery.template) 

149 strict_template: bool = envelope.delivery.options.get(OPTION_STRICT_TEMPLATE, False) 

150 addresses: list[str] = envelope.target.email or [] 

151 snapshot_url: str | None = data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL) 

152 if snapshot_url is None: 

153 # older location for backward compatibility 

154 snapshot_url = data.get(ATTR_MEDIA_SNAPSHOT_URL) 

155 # TODO: centralize in config 

156 footer_template = data.get("footer") 

157 footer = footer_template.format(e=envelope) if footer_template else None 

158 

159 action_data: dict[str, Any] = envelope.core_action_data() 

160 extra_data: dict[str, Any] = {k: v for k, v in data.items() if k not in action_data} 

161 

162 if len(addresses) > 0: 

163 action_data[ATTR_TARGET] = addresses 

164 # default to SMTP platform default recipients if no explicit addresses 

165 

166 if data and data.get("data"): 

167 action_data[ATTR_DATA] = data.get("data") 

168 

169 image_path: Path | None = await envelope.grab_image() 

170 if image_path: 

171 action_data.setdefault("data", {}) 

172 action_data["data"]["images"] = [str(image_path)] 

173 

174 if not template_name: 

175 if footer and action_data.get(ATTR_MESSAGE): 

176 action_data[ATTR_MESSAGE] = f"{action_data[ATTR_MESSAGE]}\n\n{footer}" 

177 

178 if envelope.message_html: 

179 action_data.setdefault("data", {}) 

180 html = envelope.message_html 

181 if image_path: 

182 image_name = image_path.name 

183 if html and "cid:%s" not in html and not html.endswith("</html"): 

184 if snapshot_url: 

185 html += f'<div><p><a href="{snapshot_url}">' 

186 html += f'<img src="cid:{image_name}"/></a>' 

187 html += "</p></div>" 

188 else: 

189 html += f'<div><p><img src="cid:{image_name}"></p></div>' 

190 

191 action_data["data"]["html"] = html 

192 else: 

193 html = await self.render_template( 

194 template_name, 

195 envelope, 

196 action_data, 

197 debug_trace, 

198 image_path=image_path, 

199 snapshot_url=snapshot_url, 

200 extra_data=extra_data, 

201 strict_template=strict_template, 

202 ) 

203 if html: 

204 action_data.setdefault("data", {}) 

205 action_data["data"]["html"] = html 

206 return await self.call_action(envelope, action_data=action_data) 

207 

208 async def load_template(self, template_name: str) -> str | None: 

209 if template_name in self.template_cache: 

210 return self.template_cache[template_name] 

211 

212 for root_path in ( 

213 self.custom_email_template_path, 

214 self.custom_template_path, 

215 self.default_template_path / "email", 

216 self.default_template_path, 

217 ): 

218 if root_path is not None: 

219 template_path: Path = root_path / template_name 

220 if await template_path.exists(): 

221 template: str 

222 async with aiofiles.open(template_path) as file: 

223 template = os.linesep.join(await file.readlines()) 

224 self.template_cache[template_name] = template 

225 return template 

226 return None 

227 

228 async def render_template( 

229 self, 

230 template_name: str, 

231 envelope: Envelope, 

232 action_data: dict[str, Any], 

233 debug_trace: DebugTrace | None = None, 

234 image_path: Path | None = None, 

235 snapshot_url: str | None = None, 

236 extra_data: dict[str, Any] | None = None, 

237 strict_template: bool = False, 

238 ) -> str | None: 

239 extra_data = extra_data or {} 

240 alert: Alert 

241 

242 try: 

243 title: str | None = action_data.get(ATTR_TITLE) 

244 message: str | None = action_data.get(ATTR_MESSAGE) 

245 preheader: str = f"{title or ''}{' ' if title else ''}{message}" 

246 preheader = preheader or "Home Assistant Notification" 

247 alert = Alert( 

248 message=message, 

249 title=title, 

250 preheader=self.pack_preheader(preheader, envelope.delivery.options), 

251 priority=envelope.priority, 

252 action_url=extra_data.get(ATTR_ACTION_URL), 

253 action_url_title=extra_data.get(ATTR_ACTION_URL_TITLE), 

254 envelope=envelope, 

255 subheading="Home Assistant Notification", 

256 server=AlertServer( 

257 name=self.hass_api.hass_name, 

258 internal_url=self.hass_api.internal_url, 

259 external_url=self.hass_api.external_url, 

260 language=self.hass_api.language, 

261 ), 

262 preformatted_html=envelope.message_html, 

263 img=None, 

264 ) 

265 

266 if snapshot_url: 

267 alert["img"] = AlertImage(url=snapshot_url, desc="Snapshot Image") 

268 elif image_path: 

269 alert["img"] = AlertImage(url=f"cid:{image_path.name}", desc=image_path.name) 

270 

271 template_content: str | None = await self.load_template(template_name) 

272 

273 if template_content is None: 

274 _LOGGER.error("SUPERNOTIFY No template found for %s", template_name) 

275 return None 

276 

277 template_obj: Template = self.context.hass_api.template(template_content) 

278 template_obj.ensure_valid() 

279 

280 if debug_trace: 

281 debug_trace.record_delivery_artefact(envelope.delivery.name, "alert", alert) 

282 

283 html: str = template_obj.async_render(variables={"alert": alert}, parse_result=False, strict=strict_template) 

284 if not html: 

285 _LOGGER.error("SUPERNOTIFY Empty result from template %s", template_name) 

286 else: 

287 return html 

288 except TemplateError as te: 

289 _LOGGER.exception("SUPERNOTIFY Failed to render template html mail: %s", te) 

290 if debug_trace: 

291 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", te) 

292 except Exception as e: 

293 _LOGGER.exception("SUPERNOTIFY Failed to generate html mail: %s", e) 

294 if debug_trace: 

295 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", e) 

296 return None 

297 

298 def pack_preheader(self, preheader: str, options: dict[str, Any]) -> str: 

299 preheader = preheader or "" 

300 phchars: str = options.get(OPTION_PREHEADER_BLANK, "") 

301 phlength: int = options.get(OPTION_PREHEADER_LENGTH, 0) 

302 if phlength and phchars: 

303 return f"{preheader}{phchars * (phlength - len(preheader))}" 

304 return preheader