Coverage for custom_components/supernotify/transports/email.py: 32%

171 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-07 15:35 +0000

1import logging 

2import os 

3import os.path 

4from typing import Any, TypedDict 

5 

6import aiofiles 

7from anyio import Path 

8from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET, ATTR_TITLE 

9from homeassistant.helpers.template import Template, TemplateError 

10from homeassistant.helpers.typing import ConfigType 

11 

12import custom_components.supernotify 

13from custom_components.supernotify import ( 

14 ATTR_ACTION_URL, 

15 ATTR_ACTION_URL_TITLE, 

16 ATTR_EMAIL, 

17 ATTR_MEDIA, 

18 ATTR_MEDIA_SNAPSHOT_URL, 

19 CONF_TEMPLATE, 

20 OPTION_JPEG, 

21 OPTION_MESSAGE_USAGE, 

22 OPTION_PNG, 

23 OPTION_SIMPLIFY_TEXT, 

24 OPTION_STRICT_TEMPLATE, 

25 OPTION_STRIP_URLS, 

26 OPTION_TARGET_CATEGORIES, 

27 TRANSPORT_EMAIL, 

28) 

29from custom_components.supernotify.context import Context 

30from custom_components.supernotify.envelope import Envelope 

31from custom_components.supernotify.hass_api import HomeAssistantAPI 

32from custom_components.supernotify.model import DebugTrace, DeliveryConfig, MessageOnlyPolicy, TransportConfig, TransportFeature 

33from custom_components.supernotify.transport import Transport 

34 

35RE_VALID_EMAIL = ( 

36 r"^[a-zA-Z0-9.+/=?^_-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$" 

37) 

38OPTION_PREHEADER_BLANK = "preheader_blank" 

39OPTION_PREHEADER_LENGTH = "preheader_length" 

40 

41_LOGGER = logging.getLogger(__name__) 

42 

43 

44class AlertServer(TypedDict): 

45 name: str 

46 internal_url: str 

47 external_url: str 

48 language: str 

49 

50 

51class AlertImage(TypedDict): 

52 url: str 

53 desc: str 

54 

55 

56class Alert(TypedDict): 

57 message: str | None 

58 title: str | None 

59 preheader: str | None 

60 priority: str 

61 envelope: Envelope 

62 action_url: str | None 

63 action_url_title: str | None 

64 subheading: str 

65 server: AlertServer 

66 preformatted_html: str | None 

67 img: AlertImage | None 

68 

69 

70class EmailTransport(Transport): 

71 name = TRANSPORT_EMAIL 

72 

73 def __init__(self, context: Context, transport_config: ConfigType | None = None) -> None: 

74 super().__init__(context, transport_config) 

75 self.default_template_path: Path = Path(os.path.join(custom_components.supernotify.__path__[0], "default_templates")) # noqa: PTH118 

76 self.custom_template_path: Path | None = None 

77 self.custom_email_template_path: Path | None = None 

78 self.template_cache: dict[str, str] = {} 

79 

80 try: 

81 if context.custom_template_path is not None: 

82 self.custom_template_path = Path(context.custom_template_path) 

83 if (context.custom_template_path / "email").exists(): 

84 self.custom_email_template_path = Path(context.custom_template_path / "email") 

85 else: 

86 _LOGGER.debug("SUPERNOTIFY Email specific custom templates not configured") 

87 else: 

88 _LOGGER.info("SUPERNOTIFY Custom templates not configured") 

89 self.custom_template_path = None 

90 except Exception as e: 

91 _LOGGER.error("SUPERNOTIFY Failed to verify custom template path %s: %s", context.custom_template_path, e) 

92 

93 def validate_action(self, action: str | None) -> bool: 

94 """Override in subclass if transport has fixed action or doesn't require one""" 

95 return action is not None 

96 

97 def auto_configure(self, hass_api: HomeAssistantAPI) -> DeliveryConfig | None: 

98 action: str | None = hass_api.find_service("notify", "homeassistant.components.smtp.notify") 

99 if action: 

100 delivery_config: DeliveryConfig = self.delivery_defaults 

101 delivery_config.action = action 

102 return delivery_config 

103 return None 

104 

105 @property 

106 def supported_features(self) -> TransportFeature: 

107 return ( 

108 TransportFeature.MESSAGE 

109 | TransportFeature.TITLE 

110 | TransportFeature.ACTIONS 

111 | TransportFeature.IMAGES 

112 | TransportFeature.TEMPLATE_FILE 

113 | TransportFeature.SNAPSHOT_IMAGE 

114 ) 

115 

116 def extra_attributes(self) -> dict[str, Any]: 

117 return { 

118 "cached_templates": list(self.template_cache.keys()), 

119 "custom_templates": str(self.custom_template_path) if self.custom_template_path else None, 

120 "custom_email_templates": str(self.custom_email_template_path) if self.custom_email_template_path else None, 

121 } 

122 

123 @property 

124 def default_config(self) -> TransportConfig: 

125 config = TransportConfig() 

126 config.delivery_defaults.options = { 

127 OPTION_SIMPLIFY_TEXT: False, 

128 OPTION_STRIP_URLS: False, 

129 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD, 

130 OPTION_TARGET_CATEGORIES: [ATTR_EMAIL], 

131 # use sensible defaults for image attachments 

132 OPTION_JPEG: {"progressive": "true", "optimize": "true"}, 

133 OPTION_PNG: {"optimize": "true"}, 

134 OPTION_STRICT_TEMPLATE: False, 

135 OPTION_PREHEADER_BLANK: "͏‌ ", 

136 OPTION_PREHEADER_LENGTH: 100, 

137 } 

138 return config 

139 

140 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: 

141 _LOGGER.debug("SUPERNOTIFY notify_email: %s %s", envelope.delivery_name, envelope.target.email) 

142 

143 data: dict[str, Any] = envelope.data or {} 

144 html: str | None = data.get("html") 

145 template_name: str | None = data.get(CONF_TEMPLATE, envelope.delivery.template) 

146 strict_template: bool = envelope.delivery.options.get(OPTION_STRICT_TEMPLATE, False) 

147 addresses: list[str] = envelope.target.email or [] 

148 snapshot_url: str | None = data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL) 

149 if snapshot_url is None: 

150 # older location for backward compatibility 

151 snapshot_url = data.get(ATTR_MEDIA_SNAPSHOT_URL) 

152 # TODO: centralize in config 

153 footer_template = data.get("footer") 

154 footer = footer_template.format(e=envelope) if footer_template else None 

155 

156 action_data: dict[str, Any] = envelope.core_action_data() 

157 extra_data: dict[str, Any] = {k: v for k, v in data.items() if k not in action_data} 

158 

159 if len(addresses) > 0: 

160 action_data[ATTR_TARGET] = addresses 

161 # default to SMTP platform default recipients if no explicit addresses 

162 

163 if data and data.get("data"): 

164 action_data[ATTR_DATA] = data.get("data") 

165 

166 image_path: Path | None = await envelope.grab_image() 

167 if image_path: 

168 action_data.setdefault("data", {}) 

169 action_data["data"]["images"] = [str(image_path)] 

170 

171 if not template_name: 

172 if footer and action_data.get(ATTR_MESSAGE): 

173 action_data[ATTR_MESSAGE] = f"{action_data[ATTR_MESSAGE]}\n\n{footer}" 

174 

175 if envelope.message_html: 

176 action_data.setdefault("data", {}) 

177 html = envelope.message_html 

178 if image_path: 

179 image_name = image_path.name 

180 if html and "cid:%s" not in html and not html.endswith("</html"): 

181 if snapshot_url: 

182 html += f'<div><p><a href="{snapshot_url}">' 

183 html += f'<img src="cid:{image_name}"/></a>' 

184 html += "</p></div>" 

185 else: 

186 html += f'<div><p><img src="cid:{image_name}"></p></div>' 

187 

188 action_data["data"]["html"] = html 

189 else: 

190 html = await self.render_template( 

191 template_name, 

192 envelope, 

193 action_data, 

194 debug_trace, 

195 image_path=image_path, 

196 snapshot_url=snapshot_url, 

197 extra_data=extra_data, 

198 strict_template=strict_template, 

199 ) 

200 if html: 

201 action_data.setdefault("data", {}) 

202 action_data["data"]["html"] = html 

203 return await self.call_action(envelope, action_data=action_data) 

204 

205 async def load_template(self, template_name: str) -> str | None: 

206 if template_name in self.template_cache: 

207 return self.template_cache[template_name] 

208 

209 for root_path in ( 

210 self.custom_email_template_path, 

211 self.custom_template_path, 

212 self.default_template_path / "email", 

213 self.default_template_path, 

214 ): 

215 if root_path is not None: 

216 template_path: Path = root_path / template_name 

217 if await template_path.exists(): 

218 template: str 

219 async with aiofiles.open(template_path) as file: 

220 template = os.linesep.join(await file.readlines()) 

221 self.template_cache[template_name] = template 

222 return template 

223 return None 

224 

225 async def render_template( 

226 self, 

227 template_name: str, 

228 envelope: Envelope, 

229 action_data: dict[str, Any], 

230 debug_trace: DebugTrace | None = None, 

231 image_path: Path | None = None, 

232 snapshot_url: str | None = None, 

233 extra_data: dict[str, Any] | None = None, 

234 strict_template: bool = False, 

235 ) -> str | None: 

236 extra_data = extra_data or {} 

237 alert: Alert 

238 

239 try: 

240 title: str | None = action_data.get(ATTR_TITLE) 

241 message: str | None = action_data.get(ATTR_MESSAGE) 

242 preheader: str = f"{title or ''}{' ' if title else ''}{message}" 

243 preheader = preheader or "Home Assistant Notification" 

244 alert = Alert( 

245 message=message, 

246 title=title, 

247 preheader=self.pack_preheader(preheader, envelope.delivery.options), 

248 priority=envelope.priority, 

249 action_url=extra_data.get(ATTR_ACTION_URL), 

250 action_url_title=extra_data.get(ATTR_ACTION_URL_TITLE), 

251 envelope=envelope, 

252 subheading="Home Assistant Notification", 

253 server=AlertServer( 

254 name=self.hass_api.hass_name, 

255 internal_url=self.hass_api.internal_url, 

256 external_url=self.hass_api.external_url, 

257 language=self.hass_api.language, 

258 ), 

259 preformatted_html=envelope.message_html, 

260 img=None, 

261 ) 

262 

263 if snapshot_url: 

264 alert["img"] = AlertImage(url=snapshot_url, desc="Snapshot Image") 

265 elif image_path: 

266 alert["img"] = AlertImage(url=f"cid:{image_path.name}", desc=image_path.name) 

267 

268 template_content: str | None = await self.load_template(template_name) 

269 

270 if template_content is None: 

271 _LOGGER.error("SUPERNOTIFY No template found for %s", template_name) 

272 return None 

273 

274 template_obj: Template = self.context.hass_api.template(template_content) 

275 template_obj.ensure_valid() 

276 

277 if debug_trace: 

278 debug_trace.record_delivery_artefact(envelope.delivery.name, "alert", alert) 

279 

280 html: str = template_obj.async_render(variables={"alert": alert}, parse_result=False, strict=strict_template) 

281 if not html: 

282 _LOGGER.error("SUPERNOTIFY Empty result from template %s", template_name) 

283 else: 

284 return html 

285 except TemplateError as te: 

286 _LOGGER.exception("SUPERNOTIFY Failed to render template html mail: %s", te) 

287 if debug_trace: 

288 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", te) 

289 except Exception as e: 

290 _LOGGER.exception("SUPERNOTIFY Failed to generate html mail: %s", e) 

291 if debug_trace: 

292 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", e) 

293 return None 

294 

295 def pack_preheader(self, preheader: str, options: dict[str, Any]) -> str: 

296 preheader = preheader or "" 

297 phchars: str = options.get(OPTION_PREHEADER_BLANK, "") 

298 phlength: int = options.get(OPTION_PREHEADER_LENGTH, 0) 

299 if phlength and phchars: 

300 return f"{preheader}{phchars * (phlength - len(preheader))}" 

301 return preheader