Coverage for custom_components/supernotify/transports/email.py: 32%
171 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
1import logging
2import os
3import os.path
4from typing import Any, TypedDict
6import aiofiles
7from anyio import Path
8from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET, ATTR_TITLE
9from homeassistant.helpers.template import Template, TemplateError
10from homeassistant.helpers.typing import ConfigType
12import custom_components.supernotify
13from custom_components.supernotify import (
14 ATTR_ACTION_URL,
15 ATTR_ACTION_URL_TITLE,
16 ATTR_EMAIL,
17 ATTR_MEDIA,
18 ATTR_MEDIA_SNAPSHOT_URL,
19 CONF_TEMPLATE,
20 OPTION_JPEG,
21 OPTION_MESSAGE_USAGE,
22 OPTION_PNG,
23 OPTION_SIMPLIFY_TEXT,
24 OPTION_STRICT_TEMPLATE,
25 OPTION_STRIP_URLS,
26 OPTION_TARGET_CATEGORIES,
27 TRANSPORT_EMAIL,
28)
29from custom_components.supernotify.context import Context
30from custom_components.supernotify.envelope import Envelope
31from custom_components.supernotify.hass_api import HomeAssistantAPI
32from custom_components.supernotify.model import DebugTrace, DeliveryConfig, MessageOnlyPolicy, TransportConfig, TransportFeature
33from custom_components.supernotify.transport import Transport
35RE_VALID_EMAIL = (
36 r"^[a-zA-Z0-9.+/=?^_-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$"
37)
38OPTION_PREHEADER_BLANK = "preheader_blank"
39OPTION_PREHEADER_LENGTH = "preheader_length"
41_LOGGER = logging.getLogger(__name__)
44class AlertServer(TypedDict):
45 name: str
46 internal_url: str
47 external_url: str
48 language: str
51class AlertImage(TypedDict):
52 url: str
53 desc: str
56class Alert(TypedDict):
57 message: str | None
58 title: str | None
59 preheader: str | None
60 priority: str
61 envelope: Envelope
62 action_url: str | None
63 action_url_title: str | None
64 subheading: str
65 server: AlertServer
66 preformatted_html: str | None
67 img: AlertImage | None
70class EmailTransport(Transport):
71 name = TRANSPORT_EMAIL
73 def __init__(self, context: Context, transport_config: ConfigType | None = None) -> None:
74 super().__init__(context, transport_config)
75 self.default_template_path: Path = Path(os.path.join(custom_components.supernotify.__path__[0], "default_templates")) # noqa: PTH118
76 self.custom_template_path: Path | None = None
77 self.custom_email_template_path: Path | None = None
78 self.template_cache: dict[str, str] = {}
80 try:
81 if context.custom_template_path is not None:
82 self.custom_template_path = Path(context.custom_template_path)
83 if (context.custom_template_path / "email").exists():
84 self.custom_email_template_path = Path(context.custom_template_path / "email")
85 else:
86 _LOGGER.debug("SUPERNOTIFY Email specific custom templates not configured")
87 else:
88 _LOGGER.info("SUPERNOTIFY Custom templates not configured")
89 self.custom_template_path = None
90 except Exception as e:
91 _LOGGER.error("SUPERNOTIFY Failed to verify custom template path %s: %s", context.custom_template_path, e)
93 def validate_action(self, action: str | None) -> bool:
94 """Override in subclass if transport has fixed action or doesn't require one"""
95 return action is not None
97 def auto_configure(self, hass_api: HomeAssistantAPI) -> DeliveryConfig | None:
98 action: str | None = hass_api.find_service("notify", "homeassistant.components.smtp.notify")
99 if action:
100 delivery_config: DeliveryConfig = self.delivery_defaults
101 delivery_config.action = action
102 return delivery_config
103 return None
105 @property
106 def supported_features(self) -> TransportFeature:
107 return (
108 TransportFeature.MESSAGE
109 | TransportFeature.TITLE
110 | TransportFeature.ACTIONS
111 | TransportFeature.IMAGES
112 | TransportFeature.TEMPLATE_FILE
113 | TransportFeature.SNAPSHOT_IMAGE
114 )
116 def extra_attributes(self) -> dict[str, Any]:
117 return {
118 "cached_templates": list(self.template_cache.keys()),
119 "custom_templates": str(self.custom_template_path) if self.custom_template_path else None,
120 "custom_email_templates": str(self.custom_email_template_path) if self.custom_email_template_path else None,
121 }
123 @property
124 def default_config(self) -> TransportConfig:
125 config = TransportConfig()
126 config.delivery_defaults.options = {
127 OPTION_SIMPLIFY_TEXT: False,
128 OPTION_STRIP_URLS: False,
129 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD,
130 OPTION_TARGET_CATEGORIES: [ATTR_EMAIL],
131 # use sensible defaults for image attachments
132 OPTION_JPEG: {"progressive": "true", "optimize": "true"},
133 OPTION_PNG: {"optimize": "true"},
134 OPTION_STRICT_TEMPLATE: False,
135 OPTION_PREHEADER_BLANK: "͏‌ ",
136 OPTION_PREHEADER_LENGTH: 100,
137 }
138 return config
140 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool:
141 _LOGGER.debug("SUPERNOTIFY notify_email: %s %s", envelope.delivery_name, envelope.target.email)
143 data: dict[str, Any] = envelope.data or {}
144 html: str | None = data.get("html")
145 template_name: str | None = data.get(CONF_TEMPLATE, envelope.delivery.template)
146 strict_template: bool = envelope.delivery.options.get(OPTION_STRICT_TEMPLATE, False)
147 addresses: list[str] = envelope.target.email or []
148 snapshot_url: str | None = data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL)
149 if snapshot_url is None:
150 # older location for backward compatibility
151 snapshot_url = data.get(ATTR_MEDIA_SNAPSHOT_URL)
152 # TODO: centralize in config
153 footer_template = data.get("footer")
154 footer = footer_template.format(e=envelope) if footer_template else None
156 action_data: dict[str, Any] = envelope.core_action_data()
157 extra_data: dict[str, Any] = {k: v for k, v in data.items() if k not in action_data}
159 if len(addresses) > 0:
160 action_data[ATTR_TARGET] = addresses
161 # default to SMTP platform default recipients if no explicit addresses
163 if data and data.get("data"):
164 action_data[ATTR_DATA] = data.get("data")
166 image_path: Path | None = await envelope.grab_image()
167 if image_path:
168 action_data.setdefault("data", {})
169 action_data["data"]["images"] = [str(image_path)]
171 if not template_name:
172 if footer and action_data.get(ATTR_MESSAGE):
173 action_data[ATTR_MESSAGE] = f"{action_data[ATTR_MESSAGE]}\n\n{footer}"
175 if envelope.message_html:
176 action_data.setdefault("data", {})
177 html = envelope.message_html
178 if image_path:
179 image_name = image_path.name
180 if html and "cid:%s" not in html and not html.endswith("</html"):
181 if snapshot_url:
182 html += f'<div><p><a href="{snapshot_url}">'
183 html += f'<img src="cid:{image_name}"/></a>'
184 html += "</p></div>"
185 else:
186 html += f'<div><p><img src="cid:{image_name}"></p></div>'
188 action_data["data"]["html"] = html
189 else:
190 html = await self.render_template(
191 template_name,
192 envelope,
193 action_data,
194 debug_trace,
195 image_path=image_path,
196 snapshot_url=snapshot_url,
197 extra_data=extra_data,
198 strict_template=strict_template,
199 )
200 if html:
201 action_data.setdefault("data", {})
202 action_data["data"]["html"] = html
203 return await self.call_action(envelope, action_data=action_data)
205 async def load_template(self, template_name: str) -> str | None:
206 if template_name in self.template_cache:
207 return self.template_cache[template_name]
209 for root_path in (
210 self.custom_email_template_path,
211 self.custom_template_path,
212 self.default_template_path / "email",
213 self.default_template_path,
214 ):
215 if root_path is not None:
216 template_path: Path = root_path / template_name
217 if await template_path.exists():
218 template: str
219 async with aiofiles.open(template_path) as file:
220 template = os.linesep.join(await file.readlines())
221 self.template_cache[template_name] = template
222 return template
223 return None
225 async def render_template(
226 self,
227 template_name: str,
228 envelope: Envelope,
229 action_data: dict[str, Any],
230 debug_trace: DebugTrace | None = None,
231 image_path: Path | None = None,
232 snapshot_url: str | None = None,
233 extra_data: dict[str, Any] | None = None,
234 strict_template: bool = False,
235 ) -> str | None:
236 extra_data = extra_data or {}
237 alert: Alert
239 try:
240 title: str | None = action_data.get(ATTR_TITLE)
241 message: str | None = action_data.get(ATTR_MESSAGE)
242 preheader: str = f"{title or ''}{' ' if title else ''}{message}"
243 preheader = preheader or "Home Assistant Notification"
244 alert = Alert(
245 message=message,
246 title=title,
247 preheader=self.pack_preheader(preheader, envelope.delivery.options),
248 priority=envelope.priority,
249 action_url=extra_data.get(ATTR_ACTION_URL),
250 action_url_title=extra_data.get(ATTR_ACTION_URL_TITLE),
251 envelope=envelope,
252 subheading="Home Assistant Notification",
253 server=AlertServer(
254 name=self.hass_api.hass_name,
255 internal_url=self.hass_api.internal_url,
256 external_url=self.hass_api.external_url,
257 language=self.hass_api.language,
258 ),
259 preformatted_html=envelope.message_html,
260 img=None,
261 )
263 if snapshot_url:
264 alert["img"] = AlertImage(url=snapshot_url, desc="Snapshot Image")
265 elif image_path:
266 alert["img"] = AlertImage(url=f"cid:{image_path.name}", desc=image_path.name)
268 template_content: str | None = await self.load_template(template_name)
270 if template_content is None:
271 _LOGGER.error("SUPERNOTIFY No template found for %s", template_name)
272 return None
274 template_obj: Template = self.context.hass_api.template(template_content)
275 template_obj.ensure_valid()
277 if debug_trace:
278 debug_trace.record_delivery_artefact(envelope.delivery.name, "alert", alert)
280 html: str = template_obj.async_render(variables={"alert": alert}, parse_result=False, strict=strict_template)
281 if not html:
282 _LOGGER.error("SUPERNOTIFY Empty result from template %s", template_name)
283 else:
284 return html
285 except TemplateError as te:
286 _LOGGER.exception("SUPERNOTIFY Failed to render template html mail: %s", te)
287 if debug_trace:
288 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", te)
289 except Exception as e:
290 _LOGGER.exception("SUPERNOTIFY Failed to generate html mail: %s", e)
291 if debug_trace:
292 debug_trace.record_delivery_exception(envelope.delivery.name, "html_template", e)
293 return None
295 def pack_preheader(self, preheader: str, options: dict[str, Any]) -> str:
296 preheader = preheader or ""
297 phchars: str = options.get(OPTION_PREHEADER_BLANK, "")
298 phlength: int = options.get(OPTION_PREHEADER_LENGTH, 0)
299 if phlength and phchars:
300 return f"{preheader}{phchars * (phlength - len(preheader))}"
301 return preheader