Coverage for custom_components/supernotify/transports/generic.py: 98%
167 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-03-28 19:39 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-03-28 19:39 +0000
1import logging
2from dataclasses import dataclass, field
3from typing import TYPE_CHECKING, Any
5from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN
6from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET
8# ATTR_VARIABLES from script.const has import issues
9from homeassistant.const import ATTR_ENTITY_ID
11from custom_components.supernotify.common import ensure_list
12from custom_components.supernotify.const import (
13 ATTR_ACTION_URL,
14 ATTR_MEDIA_SNAPSHOT_URL,
15 ATTR_PRIORITY,
16 OPTION_DATA_KEYS_SELECT,
17 OPTION_GENERIC_DOMAIN_STYLE,
18 OPTION_MESSAGE_USAGE,
19 OPTION_RAW,
20 OPTION_SIMPLIFY_TEXT,
21 OPTION_STRIP_URLS,
22 OPTION_TARGET_CATEGORIES,
23 PRIORITY_VALUES,
24 SELECT_INCLUDE,
25 TRANSPORT_GENERIC,
26)
27from custom_components.supernotify.model import (
28 DebugTrace,
29 MessageOnlyPolicy,
30 SelectionRule,
31 Target,
32 TargetRequired,
33 TransportConfig,
34 TransportFeature,
35)
36from custom_components.supernotify.transport import (
37 Transport,
38)
40if TYPE_CHECKING:
41 from custom_components.supernotify.delivery import Delivery
42 from custom_components.supernotify.envelope import Envelope
44_LOGGER = logging.getLogger(__name__)
45DATA_FIELDS_ALLOWED = {
46 "light": [
47 "transition",
48 "rgb_color",
49 "color_temp_kelvin",
50 "brightness_pct",
51 "brightness_step_pct",
52 "effect",
53 "rgbw_color",
54 "rgbww_color",
55 "color_name",
56 "hs_color",
57 "xy_color",
58 "color_temp",
59 "brightness",
60 "brightness_step",
61 "white",
62 "profile",
63 "flash",
64 ],
65 "siren": ["tone", "duration", "volume_level"],
66 "mqtt": ["topic", "payload", "evaluate_payload", "qos", "retain"],
67 "script": ["variables", "wait", "wait_template"],
68 "ntfy": ["title", "message", "markdown", "tags", "priority", "click", "delay", "attach", "email", "call", "icon"],
69 "tts": ["cache", "options", "message", "language", "media_player_entity_id", "entity_id", "target"],
70}
73class GenericTransport(Transport):
74 """Call any service, including non-notify ones, like switch.turn_on or mqtt.publish"""
76 name = TRANSPORT_GENERIC
78 def __init__(self, *args: Any, **kwargs: Any) -> None:
79 super().__init__(*args, **kwargs)
81 @property
82 def supported_features(self) -> TransportFeature:
83 return TransportFeature.MESSAGE | TransportFeature.TITLE
85 @property
86 def default_config(self) -> TransportConfig:
87 config = TransportConfig()
88 config.delivery_defaults.target_required = TargetRequired.OPTIONAL
89 config.delivery_defaults.options = {
90 OPTION_SIMPLIFY_TEXT: False,
91 OPTION_STRIP_URLS: False,
92 OPTION_RAW: False,
93 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD,
94 OPTION_DATA_KEYS_SELECT: None,
95 OPTION_GENERIC_DOMAIN_STYLE: None,
96 }
97 return config
99 def validate_action(self, action: str | None) -> bool:
100 if action is not None and "." in action:
101 return True
102 _LOGGER.warning("SUPERNOTIFY generic transport must have a qualified action name, e.g. notify.foo")
103 return False
105 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: # noqa: ARG002
106 # inputs
107 data: dict[str, Any] = envelope.data or {}
108 core_action_data: dict[str, Any] = envelope.core_action_data(force_message=False)
109 raw_mode: bool = envelope.delivery.options.get(OPTION_RAW, False)
110 qualified_action: str | None = envelope.delivery.action
111 domain: str | None = qualified_action.split(".", 1)[0] if qualified_action and "." in qualified_action else None
112 equiv_domain: str | None = domain
113 if envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE):
114 equiv_domain = envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE)
115 _LOGGER.debug("SUPERNOTIFY Handling %s generic message as if it was %s", domain, equiv_domain)
117 # outputs
118 action_data: dict[str, Any] = {}
119 target_data: dict[str, Any] | None = {}
120 build_targets: bool = False
121 prune_data: bool = True
122 mini_envelopes: list[MiniEnvelope] = []
124 if raw_mode:
125 action_data = core_action_data
126 action_data.update(data)
127 build_targets = True
128 elif equiv_domain == "notify":
129 action_data = core_action_data
130 if qualified_action == "notify.send_message":
131 # amongst the wild west of notifty handling, at least care for the modern core one
132 action_data = core_action_data
133 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
134 prune_data = False
135 else:
136 action_data = core_action_data
137 action_data[ATTR_DATA] = data
138 build_targets = True
139 elif equiv_domain == "input_text":
140 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
141 if "value" in data:
142 action_data = {"value": data["value"]}
143 else:
144 action_data = {"value": core_action_data[ATTR_MESSAGE]}
145 elif equiv_domain == "switch":
146 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
147 elif equiv_domain == "mqtt":
148 action_data = data
149 if "payload" not in action_data:
150 action_data["payload"] = envelope.message
151 # add `payload:` with empty value for empty topic
152 elif equiv_domain == "tts":
153 action_data = core_action_data
154 action_data.update(data)
155 build_targets = True
156 elif qualified_action == "ntfy.publish":
157 mini_envelopes.extend(ntfy(qualified_action, core_action_data, data, envelope.target, envelope.delivery))
158 elif equiv_domain in ("siren", "light"):
159 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
160 action_data = data
161 elif equiv_domain == "rest_command":
162 action_data = data
163 elif equiv_domain == "script":
164 mini_envelopes.extend(script(qualified_action, core_action_data, data, envelope.target, envelope.delivery))
165 else:
166 action_data = core_action_data
167 action_data.update(data)
168 build_targets = True
170 if mini_envelopes:
171 results: list[bool] = [
172 await self.call_action(
173 envelope, qualified_action, action_data=mini_envelope.action_data, target_data=mini_envelope.target_data
174 )
175 for mini_envelope in mini_envelopes
176 ]
177 return all(results)
179 if build_targets:
180 all_targets: list[str] = []
181 if OPTION_TARGET_CATEGORIES in envelope.delivery.options:
182 for category in ensure_list(envelope.delivery.options.get(OPTION_TARGET_CATEGORIES, [])):
183 all_targets.extend(envelope.target.for_category(category))
184 else:
185 all_targets = envelope.target.resolved_targets()
186 if len(all_targets) == 1:
187 action_data[ATTR_TARGET] = all_targets[0]
188 elif len(all_targets) >= 1:
189 action_data[ATTR_TARGET] = all_targets
191 if prune_data and action_data:
192 action_data = customize_data(action_data, domain if not raw_mode else None, envelope.delivery)
194 return await self.call_action(envelope, qualified_action, action_data=action_data, target_data=target_data or None)
197def customize_data(data: dict[str, Any], domain: str | None, delivery: Delivery) -> dict[str, Any]:
198 if not data:
199 return data
200 if delivery.options.get(OPTION_DATA_KEYS_SELECT):
201 selection_rule: SelectionRule | None = SelectionRule(delivery.options.get(OPTION_DATA_KEYS_SELECT))
202 else:
203 selection_rule = None
205 if selection_rule is None and domain and domain in DATA_FIELDS_ALLOWED:
206 selection_rule = SelectionRule({SELECT_INCLUDE: DATA_FIELDS_ALLOWED[domain]})
208 if selection_rule is None and ATTR_DATA not in data:
209 return data
210 pruned: dict[str, Any] = {}
211 for key in data:
212 if selection_rule is None or selection_rule.match(key):
213 pruned[key] = data[key]
214 if ATTR_DATA in pruned and not pruned[ATTR_DATA]:
215 del pruned[ATTR_DATA]
216 return pruned
219@dataclass
220class MiniEnvelope:
221 action_data: dict[str, Any] = field(default_factory=dict)
222 target_data: dict[str, Any] | None = None
225def script(
226 qualified_action: str | None, core_action_data: dict[str, Any], data: dict[str, Any], target: Target, delivery: Delivery
227) -> list[MiniEnvelope]:
228 """Customize `data` for script integration"""
229 results: list[MiniEnvelope] = []
230 if qualified_action in ("script.turn_on", "script.turn_off"):
231 action_data = {}
232 action_data["variables"] = core_action_data
233 if "variables" in data:
234 action_data["variables"].update(data.pop("variables"))
235 action_data["variables"].update(data)
236 customize_data(action_data, "script", delivery)
237 results.append(MiniEnvelope(action_data=action_data, target_data={ATTR_ENTITY_ID: target.domain_entity_ids("script")}))
238 else:
239 action_data = core_action_data
240 action_data.update(data)
241 results.append(MiniEnvelope(action_data=action_data))
243 return results
246def ntfy(
247 qualified_action: str | None, # noqa: ARG001
248 core_action_data: dict[str, Any],
249 data: dict[str, Any],
250 target: Target,
251 delivery: Delivery,
252) -> list[MiniEnvelope]:
253 """Customize `data` for ntfy integration"""
254 results: list[MiniEnvelope] = []
255 action_data: dict[str, Any] = dict(core_action_data)
256 action_data.update(data)
257 customize_data(action_data, "ntfy", delivery)
259 if ATTR_PRIORITY in action_data and action_data[ATTR_PRIORITY] in PRIORITY_VALUES:
260 action_data[ATTR_PRIORITY] = PRIORITY_VALUES.get(action_data[ATTR_PRIORITY], 3)
262 if action_data.get(ATTR_MEDIA_SNAPSHOT_URL) and "attach" not in action_data:
263 action_data["attach"] = action_data.get(ATTR_MEDIA_SNAPSHOT_URL)
264 if action_data.get(ATTR_ACTION_URL) and "click" not in action_data:
265 action_data["click"] = action_data.get(ATTR_ACTION_URL)
267 if target.email and "email" not in action_data:
268 for email in target.email:
269 call_data: dict[str, Any] = dict(action_data)
270 if len(results) == 1 and len(target.email) == 1:
271 results[0].action_data["email"] = email
272 else:
273 call_data["email"] = email
274 results.append(MiniEnvelope(action_data=call_data))
275 if target.phone and "call" not in action_data:
276 for phone in target.phone:
277 call_data = dict(action_data)
278 if len(results) == 1 and len(target.phone) == 1:
279 results[0].action_data["call"] = phone
280 else:
281 call_data["call"] = phone
282 results.append(MiniEnvelope(action_data=call_data))
283 notify_entities = target.domain_entity_ids(NOTIFY_DOMAIN)
284 if not results or notify_entities:
285 if len(results) == 1:
286 results[0].target_data = {"entity_id": notify_entities}
287 else:
288 results.append(MiniEnvelope(action_data=dict(action_data), target_data={"entity_id": notify_entities}))
290 return results