Coverage for custom_components/supernotify/transports/generic.py: 97%
208 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1import logging
2from dataclasses import dataclass, field
3from typing import TYPE_CHECKING, Any
5from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN
6from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET, ATTR_TITLE
8# ATTR_VARIABLES from script.const has import issues
9from homeassistant.const import ATTR_ENTITY_ID
11from custom_components.supernotify.common import ensure_list
12from custom_components.supernotify.const import (
13 ATTR_ACTION_URL,
14 ATTR_ACTIONS,
15 ATTR_MEDIA,
16 ATTR_MEDIA_SNAPSHOT_URL,
17 ATTR_PRIORITY,
18 OPTION_DATA_KEYS_SELECT,
19 OPTION_GENERIC_DOMAIN_STYLE,
20 OPTION_MESSAGE_USAGE,
21 OPTION_RAW,
22 OPTION_SIMPLIFY_TEXT,
23 OPTION_STRIP_URLS,
24 OPTION_TARGET_CATEGORIES,
25 PRIORITY_CRITICAL,
26 PRIORITY_HIGH,
27 PRIORITY_LOW,
28 PRIORITY_MEDIUM,
29 PRIORITY_MINIMUM,
30 PRIORITY_VALUES,
31 TRANSPORT_GENERIC,
32)
33from custom_components.supernotify.model import (
34 DebugTrace,
35 MessageOnlyPolicy,
36 SelectionRule,
37 Target,
38 TargetRequired,
39 TransportConfig,
40 TransportFeature,
41)
42from custom_components.supernotify.transport import (
43 Transport,
44)
46if TYPE_CHECKING:
47 from custom_components.supernotify.delivery import Delivery
48 from custom_components.supernotify.envelope import Envelope
49 from custom_components.supernotify.hass_api import HomeAssistantAPI
51_LOGGER = logging.getLogger(__name__)
52"""
53Replaced by reuse of original service schema to prune out fields
55DATA_FIELDS_ALLOWED_BY_DOMAIN = {
56 "light": [
57 "transition",
58 "rgb_color",
59 "color_temp_kelvin",
60 "brightness_pct",
61 "brightness_step_pct",
62 "effect",
63 "rgbw_color",
64 "rgbww_color",
65 "color_name",
66 "hs_color",
67 "xy_color",
68 "color_temp",
69 "brightness",
70 "brightness_step",
71 "white",
72 "profile",
73 "flash",
74 ],
75 "siren": ["tone", "duration", "volume_level"],
76 "mqtt": ["topic", "payload", "evaluate_payload", "qos", "retain"],
77 "script": ["variables", "wait", "wait_template"],
78 "ntfy": [
79 "title",
80 "message",
81 "markdown",
82 "tags",
83 "priority",
84 "click",
85 "delay",
86 "attach",
87 "attach_file",
88 "filename",
89 "email",
90 "call",
91 "icon",
92 "action",
93 "sequence_id",
94 ],
95 "tts": ["cache", "options", "message", "language", "media_player_entity_id", "entity_id", "target"],
96} """
99class GenericTransport(Transport):
100 """Call any service, including non-notify ones, like switch.turn_on or mqtt.publish"""
102 name = TRANSPORT_GENERIC
104 def __init__(self, *args: Any, **kwargs: Any) -> None:
105 super().__init__(*args, **kwargs)
107 @property
108 def supported_features(self) -> TransportFeature:
109 return TransportFeature.MESSAGE | TransportFeature.TITLE
111 @property
112 def default_config(self) -> TransportConfig:
113 config = TransportConfig()
114 config.delivery_defaults.target_required = TargetRequired.OPTIONAL
115 config.delivery_defaults.options = {
116 OPTION_SIMPLIFY_TEXT: False,
117 OPTION_STRIP_URLS: False,
118 OPTION_RAW: False,
119 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD,
120 OPTION_DATA_KEYS_SELECT: None,
121 OPTION_GENERIC_DOMAIN_STYLE: None,
122 }
123 return config
125 def validate_action(self, action: str | None) -> bool:
126 if action is not None and "." in action:
127 return True
128 _LOGGER.warning("SUPERNOTIFY generic transport must have a qualified action name, e.g. notify.foo")
129 return False
131 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: # noqa: ARG002
132 # inputs
133 data: dict[str, Any] = envelope.data or {}
134 core_action_data: dict[str, Any] = envelope.core_action_data(force_message=False)
135 raw_mode: bool = envelope.delivery.options.get(OPTION_RAW, False)
136 qualified_action: str | None = envelope.delivery.action
137 split_action = (
138 qualified_action.split(".", 1) if qualified_action and "." in qualified_action else [None, qualified_action]
139 )
140 domain: str | None = split_action[0]
141 service: str | None = split_action[1]
143 equiv_domain: str | None = domain
144 if envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE):
145 equiv_domain = envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE)
146 _LOGGER.debug("SUPERNOTIFY Handling %s generic message as if it was %s", domain, equiv_domain)
148 # outputs
149 action_data: dict[str, Any] = {}
150 target_data: dict[str, Any] | None = {}
151 build_targets: bool = False
152 prune_data: bool = True
153 mini_envelopes: list[MiniEnvelope] = [] # only used for script and ntfy
155 if raw_mode:
156 action_data = core_action_data
157 action_data.update(data)
158 build_targets = True
159 elif equiv_domain == "notify":
160 action_data = core_action_data
161 if qualified_action == "notify.send_message":
162 # amongst the wild west of notifty handling, at least care for the modern core one
163 action_data = core_action_data
164 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
165 prune_data = False
166 else:
167 action_data = core_action_data
168 action_data[ATTR_DATA] = data
169 build_targets = True
170 elif equiv_domain == "input_text":
171 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
172 if "value" in data:
173 action_data = {"value": data["value"]}
174 else:
175 action_data = {"value": core_action_data[ATTR_MESSAGE]}
176 elif equiv_domain == "switch":
177 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
178 elif equiv_domain == "mqtt":
179 action_data = data
180 if "payload" not in action_data:
181 action_data["payload"] = envelope.message
182 # add `payload:` with empty value for empty topic
183 elif equiv_domain == "tts":
184 action_data = core_action_data
185 action_data.update(data)
186 build_targets = True
187 elif equiv_domain == "notify_events":
188 mini_envelopes.extend(notify_events(envelope.message, envelope.title, core_action_data, data, envelope.delivery))
189 elif qualified_action == "ntfy.publish":
190 mini_envelopes.extend(ntfy(core_action_data, data, envelope.target, envelope.delivery, self.hass_api))
191 elif equiv_domain in ("siren", "light"):
192 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
193 action_data = data
194 elif equiv_domain == "rest_command":
195 action_data = data
196 elif equiv_domain == "script":
197 mini_envelopes.extend(
198 script(qualified_action, core_action_data, data, envelope.target, envelope.delivery, self.hass_api)
199 )
200 else:
201 action_data = core_action_data
202 action_data.update(data)
203 build_targets = True
205 if mini_envelopes:
206 results: list[bool] = [
207 await self.call_action(
208 envelope, qualified_action, action_data=mini_envelope.action_data, target_data=mini_envelope.target_data
209 )
210 for mini_envelope in mini_envelopes
211 ]
212 return all(results)
214 if build_targets:
215 all_targets: list[str] = []
216 if OPTION_TARGET_CATEGORIES in envelope.delivery.options:
217 for category in ensure_list(envelope.delivery.options.get(OPTION_TARGET_CATEGORIES, [])):
218 all_targets.extend(envelope.target.for_category(category))
219 else:
220 all_targets = envelope.target.resolved_targets()
221 if len(all_targets) == 1:
222 action_data[ATTR_TARGET] = all_targets[0]
223 elif len(all_targets) >= 1:
224 action_data[ATTR_TARGET] = all_targets
226 if prune_data and action_data:
227 action_data = customize_data(action_data, envelope.delivery)
228 if not raw_mode and domain and service:
229 # use the service schema to remove unsupported fields or force type
230 action_data = self.context.hass_api.coerce_schema(domain, service, action_data)
232 return await self.call_action(envelope, qualified_action, action_data=action_data, target_data=target_data or None)
235def customize_data(data: dict[str, Any], delivery: Delivery) -> dict[str, Any]:
236 if not data:
237 return data
238 top_selection_rule: SelectionRule | None = None
239 if delivery.options.get(OPTION_DATA_KEYS_SELECT):
240 top_selection_rule = SelectionRule(delivery.options.get(OPTION_DATA_KEYS_SELECT))
241 if top_selection_rule is None:
242 pruned: dict[str, Any] = data
243 else:
244 pruned = {}
245 for key in data:
246 if top_selection_rule is None or top_selection_rule.match(key):
247 pruned[key] = data[key]
248 if ATTR_DATA in pruned and not pruned[ATTR_DATA]:
249 # tidy up empty nested `data` maps
250 del pruned[ATTR_DATA]
252 return pruned
255@dataclass
256class MiniEnvelope:
257 action_data: dict[str, Any] = field(default_factory=dict)
258 target_data: dict[str, Any] | None = None
261def script(
262 qualified_action: str | None,
263 core_action_data: dict[str, Any],
264 data: dict[str, Any],
265 target: Target,
266 delivery: Delivery,
267 hass_api: HomeAssistantAPI,
268) -> list[MiniEnvelope]:
269 """Customize `data` for script integration"""
270 results: list[MiniEnvelope] = []
271 if qualified_action in ("script.turn_on", "script.turn_off"):
272 action_data = {}
273 action_data["variables"] = core_action_data
274 if "variables" in data:
275 action_data["variables"].update(data.pop("variables"))
276 action_data["variables"].update(data)
277 customize_data(action_data, delivery)
278 action_data = hass_api.coerce_schema("script", qualified_action.replace("script.", ""), action_data)
279 results.append(MiniEnvelope(action_data=action_data, target_data={ATTR_ENTITY_ID: target.domain_entity_ids("script")}))
280 else:
281 action_data = core_action_data
282 action_data.update(data)
283 results.append(MiniEnvelope(action_data=action_data))
285 return results
288def ntfy(
289 core_action_data: dict[str, Any],
290 data: dict[str, Any],
291 target: Target,
292 delivery: Delivery,
293 hass_api: HomeAssistantAPI,
294) -> list[MiniEnvelope]:
295 """Customize `data` for ntfy integration"""
296 results: list[MiniEnvelope] = []
297 action_data: dict[str, Any] = dict(core_action_data)
298 action_data.update(data)
299 customize_data(action_data, delivery)
300 action_data = hass_api.coerce_schema("ntfy", "publish", action_data)
302 if ATTR_PRIORITY in action_data and action_data[ATTR_PRIORITY] in PRIORITY_VALUES:
303 action_data[ATTR_PRIORITY] = PRIORITY_VALUES.get(action_data[ATTR_PRIORITY], 3)
305 media = action_data.pop(ATTR_MEDIA, {})
306 if media and media.get(ATTR_MEDIA_SNAPSHOT_URL) and "attach" not in action_data:
307 action_data["attach"] = media.get(ATTR_MEDIA_SNAPSHOT_URL)
308 actions = action_data.pop(ATTR_ACTIONS, [])
309 if len(actions) > 0:
310 first_action = actions[0]
311 if first_action.get(ATTR_ACTION_URL) and "click" not in action_data:
312 action_data["click"] = first_action.get(ATTR_ACTION_URL)
314 if target.email and "email" not in action_data:
315 for email in target.email:
316 call_data: dict[str, Any] = dict(action_data)
317 if len(results) == 1 and len(target.email) == 1:
318 results[0].action_data["email"] = email
319 else:
320 call_data["email"] = email
321 results.append(MiniEnvelope(action_data=call_data))
322 if target.phone and "call" not in action_data:
323 for phone in target.phone:
324 call_data = dict(action_data)
325 if len(results) == 1 and len(target.phone) == 1:
326 results[0].action_data["call"] = phone
327 else:
328 call_data["call"] = phone
329 results.append(MiniEnvelope(action_data=call_data))
330 notify_entities = target.domain_entity_ids(NOTIFY_DOMAIN)
331 if not results or notify_entities:
332 if len(results) == 1:
333 results[0].target_data = {"entity_id": notify_entities}
334 else:
335 results.append(MiniEnvelope(action_data=dict(action_data), target_data={"entity_id": notify_entities}))
337 return results
340def notify_events(
341 message: str | None,
342 title: str | None,
343 core_action_data: dict[str, Any],
344 data: dict[str, Any],
345 delivery: Delivery,
346) -> list[MiniEnvelope]:
347 """Customize `data` for notify_events integration"""
348 results: list[MiniEnvelope] = []
349 input_data: dict[str, Any] = dict(core_action_data)
350 input_data.update(data)
351 customize_data(input_data, delivery)
352 action_data: dict[str, Any] = {}
353 action_data[ATTR_MESSAGE] = message
354 priority_mapping: dict[str, str] = {
355 PRIORITY_MINIMUM: "lowest",
356 PRIORITY_LOW: "low",
357 PRIORITY_MEDIUM: "normal",
358 PRIORITY_HIGH: "high",
359 PRIORITY_CRITICAL: "highest",
360 }
361 if title:
362 action_data.setdefault(ATTR_DATA, {})
363 action_data[ATTR_DATA][ATTR_TITLE] = title
365 if ATTR_DATA in input_data:
366 # notify_events is schema-less for action
367 action_data[ATTR_DATA] = input_data[ATTR_DATA]
369 if ATTR_PRIORITY in input_data and input_data[ATTR_PRIORITY] in PRIORITY_VALUES:
370 action_data.setdefault(ATTR_DATA, {})
371 action_data[ATTR_DATA][ATTR_PRIORITY] = priority_mapping.get(input_data[ATTR_PRIORITY])
372 elif ATTR_PRIORITY in input_data and input_data[ATTR_PRIORITY] in priority_mapping.values():
373 action_data.setdefault(ATTR_DATA, {})
374 action_data[ATTR_DATA][ATTR_PRIORITY] = input_data[ATTR_PRIORITY]
376 if "token" in input_data:
377 action_data.setdefault(ATTR_DATA, {})
378 action_data[ATTR_DATA]["token"] = input_data["token"]
379 if "level" in input_data:
380 action_data.setdefault(ATTR_DATA, {})
381 action_data[ATTR_DATA]["level"] = input_data["level"]
383 if input_data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL) and "images" not in input_data:
384 action_data.setdefault(ATTR_DATA, {})
385 action_data[ATTR_DATA].setdefault("images", [])
386 action_data[ATTR_DATA]["images"].append({"url": input_data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL)})
388 results.append(MiniEnvelope(action_data=dict(action_data)))
390 return results