Coverage for custom_components/supernotify/transports/generic.py: 18%
169 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
1import logging
2from dataclasses import dataclass, field
3from typing import Any
5from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN
6from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET
8# ATTR_VARIABLES from script.const has import issues
9from homeassistant.const import ATTR_ENTITY_ID
11from custom_components.supernotify import (
12 ATTR_ACTION_URL,
13 ATTR_MEDIA_SNAPSHOT_URL,
14 ATTR_PRIORITY,
15 OPTION_DATA_KEYS_SELECT,
16 OPTION_GENERIC_DOMAIN_STYLE,
17 OPTION_MESSAGE_USAGE,
18 OPTION_RAW,
19 OPTION_SIMPLIFY_TEXT,
20 OPTION_STRIP_URLS,
21 OPTION_TARGET_CATEGORIES,
22 PRIORITY_VALUES,
23 SELECT_INCLUDE,
24 TRANSPORT_GENERIC,
25)
26from custom_components.supernotify.common import ensure_list
27from custom_components.supernotify.delivery import Delivery
28from custom_components.supernotify.envelope import Envelope
29from custom_components.supernotify.model import (
30 DebugTrace,
31 MessageOnlyPolicy,
32 SelectionRule,
33 Target,
34 TargetRequired,
35 TransportConfig,
36 TransportFeature,
37)
38from custom_components.supernotify.transport import (
39 Transport,
40)
42_LOGGER = logging.getLogger(__name__)
43DATA_FIELDS_ALLOWED = {
44 "light": [
45 "transition",
46 "rgb_color",
47 "color_temp_kelvin",
48 "brightness_pct",
49 "brightness_step_pct",
50 "effect",
51 "rgbw_color",
52 "rgbww_color",
53 "color_name",
54 "hs_color",
55 "xy_color",
56 "color_temp",
57 "brightness",
58 "brightness_step",
59 "white",
60 "profile",
61 "flash",
62 ],
63 "siren": ["tone", "duration", "volume_level"],
64 "mqtt": ["topic", "payload", "evaluate_payload", "qos", "retain"],
65 "script": ["variables", "wait", "wait_template"],
66 "ntfy": ["title", "message", "markdown", "tags", "priority", "click", "delay", "attach", "email", "call", "icon"],
67 "tts": ["cache", "options", "message", "language", "media_player_entity_id", "entity_id", "target"],
68}
71class GenericTransport(Transport):
72 """Call any service, including non-notify ones, like switch.turn_on or mqtt.publish"""
74 name = TRANSPORT_GENERIC
76 def __init__(self, *args: Any, **kwargs: Any) -> None:
77 super().__init__(*args, **kwargs)
79 @property
80 def supported_features(self) -> TransportFeature:
81 return TransportFeature.MESSAGE | TransportFeature.TITLE
83 @property
84 def default_config(self) -> TransportConfig:
85 config = TransportConfig()
86 config.delivery_defaults.target_required = TargetRequired.OPTIONAL
87 config.delivery_defaults.options = {
88 OPTION_SIMPLIFY_TEXT: False,
89 OPTION_STRIP_URLS: False,
90 OPTION_RAW: False,
91 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD,
92 OPTION_DATA_KEYS_SELECT: None,
93 OPTION_GENERIC_DOMAIN_STYLE: None,
94 }
95 return config
97 def validate_action(self, action: str | None) -> bool:
98 if action is not None and "." in action:
99 return True
100 _LOGGER.warning("SUPERNOTIFY generic transport must have a qualified action name, e.g. notify.foo")
101 return False
103 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: # noqa: ARG002
104 # inputs
105 data: dict[str, Any] = envelope.data or {}
106 core_action_data: dict[str, Any] = envelope.core_action_data(force_message=False)
107 raw_mode: bool = envelope.delivery.options.get(OPTION_RAW, False)
108 qualified_action: str | None = envelope.delivery.action
109 domain: str | None = qualified_action.split(".", 1)[0] if qualified_action and "." in qualified_action else None
110 equiv_domain: str | None = domain
111 if envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE):
112 equiv_domain = envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE)
113 _LOGGER.debug("SUPERNOTIFY Handling %s generic message as if it was %s", domain, equiv_domain)
115 # outputs
116 action_data: dict[str, Any] = {}
117 target_data: dict[str, Any] | None = {}
118 build_targets: bool = False
119 prune_data: bool = True
120 mini_envelopes: list[MiniEnvelope] = []
122 if raw_mode:
123 action_data = core_action_data
124 action_data.update(data)
125 build_targets = True
126 elif equiv_domain == "notify":
127 action_data = core_action_data
128 if qualified_action == "notify.send_message":
129 # amongst the wild west of notifty handling, at least care for the modern core one
130 action_data = core_action_data
131 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
132 prune_data = False
133 else:
134 action_data = core_action_data
135 action_data[ATTR_DATA] = data
136 build_targets = True
137 elif equiv_domain == "input_text":
138 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
139 if "value" in data:
140 action_data = {"value": data["value"]}
141 else:
142 action_data = {"value": core_action_data[ATTR_MESSAGE]}
143 elif equiv_domain == "switch":
144 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
145 elif equiv_domain == "mqtt":
146 action_data = data
147 if "payload" not in action_data:
148 action_data["payload"] = envelope.message
149 # add `payload:` with empty value for empty topic
150 elif equiv_domain == "tts":
151 action_data = core_action_data
152 action_data.update(data)
153 build_targets = True
154 elif qualified_action == "ntfy.publish":
155 mini_envelopes.extend(ntfy(qualified_action, core_action_data, data, envelope.target, envelope.delivery))
156 elif equiv_domain in ("siren", "light"):
157 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)}
158 action_data = data
159 elif equiv_domain == "rest_command":
160 action_data = data
161 elif equiv_domain == "script":
162 mini_envelopes.extend(script(qualified_action, core_action_data, data, envelope.target, envelope.delivery))
163 else:
164 action_data = core_action_data
165 action_data.update(data)
166 build_targets = True
168 if mini_envelopes:
169 results: list[bool] = [
170 await self.call_action(
171 envelope, qualified_action, action_data=mini_envelope.action_data, target_data=mini_envelope.target_data
172 )
173 for mini_envelope in mini_envelopes
174 ]
175 return all(results)
177 if build_targets:
178 all_targets: list[str] = []
179 if OPTION_TARGET_CATEGORIES in envelope.delivery.options:
180 for category in ensure_list(envelope.delivery.options.get(OPTION_TARGET_CATEGORIES, [])):
181 all_targets.extend(envelope.target.for_category(category))
182 else:
183 all_targets = envelope.target.resolved_targets()
184 if len(all_targets) == 1:
185 action_data[ATTR_TARGET] = all_targets[0]
186 elif len(all_targets) >= 1:
187 action_data[ATTR_TARGET] = all_targets
189 if prune_data and action_data:
190 action_data = customize_data(action_data, domain if not raw_mode else None, envelope.delivery)
192 return await self.call_action(envelope, qualified_action, action_data=action_data, target_data=target_data or None)
195def customize_data(data: dict[str, Any], domain: str | None, delivery: Delivery) -> dict[str, Any]:
196 if not data:
197 return data
198 if delivery.options.get(OPTION_DATA_KEYS_SELECT):
199 selection_rule: SelectionRule | None = SelectionRule(delivery.options.get(OPTION_DATA_KEYS_SELECT))
200 else:
201 selection_rule = None
203 if selection_rule is None and domain and domain in DATA_FIELDS_ALLOWED:
204 selection_rule = SelectionRule({SELECT_INCLUDE: DATA_FIELDS_ALLOWED[domain]})
206 if selection_rule is None and ATTR_DATA not in data:
207 return data
208 pruned: dict[str, Any] = {}
209 for key in data:
210 if selection_rule is None or selection_rule.match(key):
211 pruned[key] = data[key]
212 if ATTR_DATA in pruned and not pruned[ATTR_DATA]:
213 del pruned[ATTR_DATA]
214 return pruned
217@dataclass
218class MiniEnvelope:
219 action_data: dict[str, Any] = field(default_factory=dict)
220 target_data: dict[str, Any] | None = None
223def script(
224 qualified_action: str | None, core_action_data: dict[str, Any], data: dict[str, Any], target: Target, delivery: Delivery
225) -> list[MiniEnvelope]:
226 """Customize `data` for script integration"""
227 results: list[MiniEnvelope] = []
228 if qualified_action in ("script.turn_on", "script.turn_off"):
229 action_data = {}
230 action_data["variables"] = core_action_data
231 if "variables" in data:
232 action_data["variables"].update(data.pop("variables"))
233 action_data["variables"].update(data)
234 customize_data(action_data, "script", delivery)
235 results.append(MiniEnvelope(action_data=action_data, target_data={ATTR_ENTITY_ID: target.domain_entity_ids("script")}))
236 else:
237 action_data = core_action_data
238 action_data.update(data)
239 results.append(MiniEnvelope(action_data=action_data))
241 return results
244def ntfy(
245 qualified_action: str | None, # noqa: ARG001
246 core_action_data: dict[str, Any],
247 data: dict[str, Any],
248 target: Target,
249 delivery: Delivery,
250) -> list[MiniEnvelope]:
251 """Customize `data` for ntfy integration"""
252 results: list[MiniEnvelope] = []
253 action_data: dict[str, Any] = dict(core_action_data)
254 action_data.update(data)
255 customize_data(action_data, "ntfy", delivery)
257 if ATTR_PRIORITY in action_data and action_data[ATTR_PRIORITY] in PRIORITY_VALUES:
258 action_data[ATTR_PRIORITY] = PRIORITY_VALUES.get(action_data[ATTR_PRIORITY], 3)
260 if action_data.get(ATTR_MEDIA_SNAPSHOT_URL) and "attach" not in action_data:
261 action_data["attach"] = action_data.get(ATTR_MEDIA_SNAPSHOT_URL)
262 if action_data.get(ATTR_ACTION_URL) and "click" not in action_data:
263 action_data["click"] = action_data.get(ATTR_ACTION_URL)
265 if target.email and "email" not in action_data:
266 for email in target.email:
267 call_data: dict[str, Any] = dict(action_data)
268 if len(results) == 1 and len(target.email) == 1:
269 results[0].action_data["email"] = email
270 else:
271 call_data["email"] = email
272 results.append(MiniEnvelope(action_data=call_data))
273 if target.phone and "call" not in action_data:
274 for phone in target.phone:
275 call_data = dict(action_data)
276 if len(results) == 1 and len(target.phone) == 1:
277 results[0].action_data["call"] = phone
278 else:
279 call_data["call"] = phone
280 results.append(MiniEnvelope(action_data=call_data))
281 notify_entities = target.domain_entity_ids(NOTIFY_DOMAIN)
282 if not results or notify_entities:
283 if len(results) == 1:
284 results[0].target_data = {"entity_id": notify_entities}
285 else:
286 results.append(MiniEnvelope(action_data=dict(action_data), target_data={"entity_id": notify_entities}))
288 return results