Coverage for custom_components/supernotify/transports/email.py: 97%
150 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1import logging
2import os
3import os.path
4from typing import TYPE_CHECKING, Any, TypedDict
6import aiofiles
7from anyio import Path
8from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET, ATTR_TITLE
9from homeassistant.helpers.template import Template, TemplateError
11import custom_components.supernotify
12from custom_components.supernotify.const import (
13 ATTR_ACTION_URL,
14 ATTR_ACTION_URL_TITLE,
15 ATTR_EMAIL,
16 ATTR_MEDIA,
17 ATTR_MEDIA_SNAPSHOT_URL,
18 CONF_TEMPLATE,
19 OPTION_JPEG,
20 OPTION_MESSAGE_USAGE,
21 OPTION_PNG,
22 OPTION_SIMPLIFY_TEXT,
23 OPTION_STRICT_TEMPLATE,
24 OPTION_STRIP_URLS,
25 OPTION_TARGET_CATEGORIES,
26 TRANSPORT_EMAIL,
27)
28from custom_components.supernotify.model import DebugTrace, DeliveryConfig, MessageOnlyPolicy, TransportConfig, TransportFeature
29from custom_components.supernotify.transport import Transport
31if TYPE_CHECKING:
32 from homeassistant.helpers.typing import ConfigType
34 from custom_components.supernotify.context import Context
35 from custom_components.supernotify.envelope import Envelope
36 from custom_components.supernotify.hass_api import HomeAssistantAPI
38RE_VALID_EMAIL = (
39 r"^[a-zA-Z0-9.+/=?^_-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$"
40)
41OPTION_PREHEADER_BLANK = "preheader_blank"
42OPTION_PREHEADER_LENGTH = "preheader_length"
44_LOGGER = logging.getLogger(__name__)
47class AlertServer(TypedDict):
48 name: str
49 internal_url: str
50 external_url: str
51 language: str
54class AlertImage(TypedDict):
55 url: str
56 desc: str
59class Alert(TypedDict):
60 message: str | None
61 title: str | None
62 preheader: str | None
63 priority: str
64 envelope: Envelope
65 action_url: str | None
66 action_url_title: str | None
67 subheading: str
68 server: AlertServer
69 preformatted_html: str | None
70 img: AlertImage | None
73class EmailTransport(Transport):
74 name = TRANSPORT_EMAIL
76 def __init__(self, context: Context, transport_config: ConfigType | None = None) -> None:
77 super().__init__(context, transport_config)
78 self.default_template_path: Path = Path(os.path.join(custom_components.supernotify.__path__[0], "default_templates")) # noqa: PTH118
79 self.custom_template_path: Path | None = None
80 self.custom_email_template_path: Path | None = None
81 self.template_cache: dict[str, str] = {}
83 try:
84 if context.custom_template_path is not None:
85 self.custom_template_path = Path(context.custom_template_path)
86 if (context.custom_template_path / "email").exists():
87 self.custom_email_template_path = Path(context.custom_template_path / "email")
88 else:
89 _LOGGER.debug("SUPERNOTIFY Email specific custom templates not configured")
90 else:
91 _LOGGER.info("SUPERNOTIFY Custom templates not configured")
92 self.custom_template_path = None
93 except Exception as e:
94 _LOGGER.error("SUPERNOTIFY Failed to verify custom template path %s: %s", context.custom_template_path, e)
96 def validate_action(self, action: str | None) -> bool:
97 """Override in subclass if transport has fixed action or doesn't require one"""
98 return action is not None
100 def auto_configure(self, hass_api: HomeAssistantAPI) -> DeliveryConfig | None:
101 action: str | None = hass_api.find_service("notify", "homeassistant.components.smtp.notify")
102 if action:
103 delivery_config: DeliveryConfig = self.delivery_defaults
104 delivery_config.action = action
105 return delivery_config
106 return None
108 @property
109 def supported_features(self) -> TransportFeature:
110 return (
111 TransportFeature.MESSAGE
112 | TransportFeature.TITLE
113 | TransportFeature.ACTIONS
114 | TransportFeature.IMAGES
115 | TransportFeature.TEMPLATE_FILE
116 | TransportFeature.SNAPSHOT_IMAGE
117 )
119 def extra_attributes(self) -> dict[str, Any]:
120 return {
121 "cached_templates": list(self.template_cache.keys()),
122 "custom_templates": str(self.custom_template_path) if self.custom_template_path else None,
123 "custom_email_templates": str(self.custom_email_template_path) if self.custom_email_template_path else None,
124 }
126 @property
127 def default_config(self) -> TransportConfig:
128 config = TransportConfig()
129 config.delivery_defaults.options = {
130 OPTION_SIMPLIFY_TEXT: False,
131 OPTION_STRIP_URLS: False,
132 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD,
133 OPTION_TARGET_CATEGORIES: [ATTR_EMAIL],
134 # use sensible defaults for image attachments
135 OPTION_JPEG: {"progressive": "true", "optimize": "true"},
136 OPTION_PNG: {"optimize": "true"},
137 OPTION_STRICT_TEMPLATE: False,
138 OPTION_PREHEADER_BLANK: "͏‌ ",
139 OPTION_PREHEADER_LENGTH: 100,
140 }
141 return config
143 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool:
144 _LOGGER.debug("SUPERNOTIFY notify_email: %s %s", envelope.delivery_name, envelope.target.email)
146 data: dict[str, Any] = envelope.data or {}
147 html: str | None = data.get("html")
148 template_name: str | None = data.get(CONF_TEMPLATE, envelope.delivery.template)
149 strict_template: bool = envelope.delivery.options.get(OPTION_STRICT_TEMPLATE, False)
150 addresses: list[str] = envelope.target.email or []
151 snapshot_url: str | None = data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL)
152 if snapshot_url is None:
153 # older location for backward compatibility
154 snapshot_url = data.get(ATTR_MEDIA_SNAPSHOT_URL)
155 # TODO: centralize in config
156 footer_template = data.get("footer")
157 footer = footer_template.format(e=envelope) if footer_template else None
159 action_data: dict[str, Any] = envelope.core_action_data()
160 extra_data: dict[str, Any] = {k: v for k, v in data.items() if k not in action_data}
162 if len(addresses) > 0:
163 action_data[ATTR_TARGET] = addresses
164 # default to SMTP platform default recipients if no explicit addresses
166 if data and data.get("data"):
167 action_data[ATTR_DATA] = data.get("data")
169 image_path: Path | None = await envelope.grab_image()
170 if image_path:
171 action_data.setdefault("data", {})
172 action_data["data"]["images"] = [str(image_path)]
174 if not template_name:
175 if footer and action_data.get(ATTR_MESSAGE):
176 action_data[ATTR_MESSAGE] = f"{action_data[ATTR_MESSAGE]}\n\n{footer}"
178 if envelope.message_html:
179 action_data.setdefault("data", {})
180 html = envelope.message_html
181 if image_path:
182 image_name = image_path.name
183 if html and "cid:%s" not in html and not html.endswith("</html"):
184 if snapshot_url:
185 html += f'<div><p><a href="{snapshot_url}">'
186 html += f'<img src="cid:{image_name}"/></a>'
187 html += "</p></div>"
188 else:
189 html += f'<div><p><img src="cid:{image_name}"></p></div>'
191 action_data["data"]["html"] = html
192 else:
193 html = await self.render_template(
194 template_name,
195 envelope,
196 action_data,
197 debug_trace,
198 image_path=image_path,
199 snapshot_url=snapshot_url,
200 extra_data=extra_data,
201 strict_template=strict_template,
202 )
203 if html:
204 action_data.setdefault("data", {})
205 action_data["data"]["html"] = html
206 return await self.call_action(envelope, action_data=action_data)
208 async def load_template(self, template_name: str) -> str | None:
209 if template_name in self.template_cache:
210 return self.template_cache[template_name]
212 for root_path in (
213 self.custom_email_template_path,
214 self.custom_template_path,
215 self.default_template_path / "email",
216 self.default_template_path,
217 ):
218 if root_path is not None:
219 template_path: Path = root_path / template_name
220 if await template_path.exists():
221 template: str
222 async with aiofiles.open(template_path) as file:
223 template = os.linesep.join(await file.readlines())
224 self.template_cache[template_name] = template
225 return template
226 return None
228 async def render_template(
229 self,
230 template_name: str,
231 envelope: Envelope,
232 action_data: dict[str, Any],
233 debug_trace: DebugTrace | None = None,
234 image_path: Path | None = None,
235 snapshot_url: str | None = None,
236 extra_data: dict[str, Any] | None = None,
237 strict_template: bool = False,
238 ) -> str | None:
239 extra_data = extra_data or {}
240 alert: Alert
242 try:
243 title: str | None = action_data.get(ATTR_TITLE)
244 message: str | None = action_data.get(ATTR_MESSAGE)
245 preheader: str = f"{title or ''}{' ' if title else ''}{message}"
246 preheader = preheader or "Home Assistant Notification"
247 alert = Alert(
248 message=message,
249 title=title,
250 preheader=self.pack_preheader(preheader, envelope.delivery.options),
251 priority=envelope.priority,
252 action_url=extra_data.get(ATTR_ACTION_URL),
253 action_url_title=extra_data.get(ATTR_ACTION_URL_TITLE),
254 envelope=envelope,
255 subheading="Home Assistant Notification",
256 server=AlertServer(
257 name=self.hass_api.hass_name,
258 internal_url=self.hass_api.internal_url,
259 external_url=self.hass_api.external_url,
260 language=self.hass_api.language,
261 ),
262 preformatted_html=envelope.message_html,
263 img=None,
264 )
266 if snapshot_url:
267 alert["img"] = AlertImage(url=snapshot_url, desc="Snapshot Image")
268 elif image_path:
269 alert["img"] = AlertImage(url=f"cid:{image_path.name}", desc=image_path.name)
271 template_content: str | None = await self.load_template(template_name)
273 if template_content is None:
274 _LOGGER.error("SUPERNOTIFY No template found for %s", template_name)
275 return None
277 template_obj: Template = self.context.hass_api.template(template_content)
278 template_obj.ensure_valid()
280 if debug_trace:
281 debug_trace.record_delivery_artefact(envelope.delivery.name, "alert", alert)
283 html: str = template_obj.async_render(variables={"alert": alert}, parse_result=False, strict=strict_template)
284 if not html:
285 _LOGGER.error("SUPERNOTIFY Empty result from template %s", template_name)
286 else:
287 return html
288 except TemplateError as te:
289 _LOGGER.exception("SUPERNOTIFY Failed to render template html mail: %s", te)
290 if debug_trace:
291 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", te)
292 except Exception as e:
293 _LOGGER.exception("SUPERNOTIFY Failed to generate html mail: %s", e)
294 if debug_trace:
295 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", e)
296 return None
298 def pack_preheader(self, preheader: str, options: dict[str, Any]) -> str:
299 preheader = preheader or ""
300 phchars: str = options.get(OPTION_PREHEADER_BLANK, "")
301 phlength: int = options.get(OPTION_PREHEADER_LENGTH, 0)
302 if phlength and phchars:
303 return f"{preheader}{phchars * (phlength - len(preheader))}"
304 return preheader