Coverage for custom_components/supernotify/transports/generic.py: 98%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-03-28 19:39 +0000

1import logging 

2from dataclasses import dataclass, field 

3from typing import TYPE_CHECKING, Any 

4 

5from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN 

6from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET 

7 

8# ATTR_VARIABLES from script.const has import issues 

9from homeassistant.const import ATTR_ENTITY_ID 

10 

11from custom_components.supernotify.common import ensure_list 

12from custom_components.supernotify.const import ( 

13 ATTR_ACTION_URL, 

14 ATTR_MEDIA_SNAPSHOT_URL, 

15 ATTR_PRIORITY, 

16 OPTION_DATA_KEYS_SELECT, 

17 OPTION_GENERIC_DOMAIN_STYLE, 

18 OPTION_MESSAGE_USAGE, 

19 OPTION_RAW, 

20 OPTION_SIMPLIFY_TEXT, 

21 OPTION_STRIP_URLS, 

22 OPTION_TARGET_CATEGORIES, 

23 PRIORITY_VALUES, 

24 SELECT_INCLUDE, 

25 TRANSPORT_GENERIC, 

26) 

27from custom_components.supernotify.model import ( 

28 DebugTrace, 

29 MessageOnlyPolicy, 

30 SelectionRule, 

31 Target, 

32 TargetRequired, 

33 TransportConfig, 

34 TransportFeature, 

35) 

36from custom_components.supernotify.transport import ( 

37 Transport, 

38) 

39 

40if TYPE_CHECKING: 

41 from custom_components.supernotify.delivery import Delivery 

42 from custom_components.supernotify.envelope import Envelope 

43 

44_LOGGER = logging.getLogger(__name__) 

45DATA_FIELDS_ALLOWED = { 

46 "light": [ 

47 "transition", 

48 "rgb_color", 

49 "color_temp_kelvin", 

50 "brightness_pct", 

51 "brightness_step_pct", 

52 "effect", 

53 "rgbw_color", 

54 "rgbww_color", 

55 "color_name", 

56 "hs_color", 

57 "xy_color", 

58 "color_temp", 

59 "brightness", 

60 "brightness_step", 

61 "white", 

62 "profile", 

63 "flash", 

64 ], 

65 "siren": ["tone", "duration", "volume_level"], 

66 "mqtt": ["topic", "payload", "evaluate_payload", "qos", "retain"], 

67 "script": ["variables", "wait", "wait_template"], 

68 "ntfy": ["title", "message", "markdown", "tags", "priority", "click", "delay", "attach", "email", "call", "icon"], 

69 "tts": ["cache", "options", "message", "language", "media_player_entity_id", "entity_id", "target"], 

70} 

71 

72 

73class GenericTransport(Transport): 

74 """Call any service, including non-notify ones, like switch.turn_on or mqtt.publish""" 

75 

76 name = TRANSPORT_GENERIC 

77 

78 def __init__(self, *args: Any, **kwargs: Any) -> None: 

79 super().__init__(*args, **kwargs) 

80 

81 @property 

82 def supported_features(self) -> TransportFeature: 

83 return TransportFeature.MESSAGE | TransportFeature.TITLE 

84 

85 @property 

86 def default_config(self) -> TransportConfig: 

87 config = TransportConfig() 

88 config.delivery_defaults.target_required = TargetRequired.OPTIONAL 

89 config.delivery_defaults.options = { 

90 OPTION_SIMPLIFY_TEXT: False, 

91 OPTION_STRIP_URLS: False, 

92 OPTION_RAW: False, 

93 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD, 

94 OPTION_DATA_KEYS_SELECT: None, 

95 OPTION_GENERIC_DOMAIN_STYLE: None, 

96 } 

97 return config 

98 

99 def validate_action(self, action: str | None) -> bool: 

100 if action is not None and "." in action: 

101 return True 

102 _LOGGER.warning("SUPERNOTIFY generic transport must have a qualified action name, e.g. notify.foo") 

103 return False 

104 

105 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: # noqa: ARG002 

106 # inputs 

107 data: dict[str, Any] = envelope.data or {} 

108 core_action_data: dict[str, Any] = envelope.core_action_data(force_message=False) 

109 raw_mode: bool = envelope.delivery.options.get(OPTION_RAW, False) 

110 qualified_action: str | None = envelope.delivery.action 

111 domain: str | None = qualified_action.split(".", 1)[0] if qualified_action and "." in qualified_action else None 

112 equiv_domain: str | None = domain 

113 if envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE): 

114 equiv_domain = envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE) 

115 _LOGGER.debug("SUPERNOTIFY Handling %s generic message as if it was %s", domain, equiv_domain) 

116 

117 # outputs 

118 action_data: dict[str, Any] = {} 

119 target_data: dict[str, Any] | None = {} 

120 build_targets: bool = False 

121 prune_data: bool = True 

122 mini_envelopes: list[MiniEnvelope] = [] 

123 

124 if raw_mode: 

125 action_data = core_action_data 

126 action_data.update(data) 

127 build_targets = True 

128 elif equiv_domain == "notify": 

129 action_data = core_action_data 

130 if qualified_action == "notify.send_message": 

131 # amongst the wild west of notifty handling, at least care for the modern core one 

132 action_data = core_action_data 

133 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

134 prune_data = False 

135 else: 

136 action_data = core_action_data 

137 action_data[ATTR_DATA] = data 

138 build_targets = True 

139 elif equiv_domain == "input_text": 

140 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

141 if "value" in data: 

142 action_data = {"value": data["value"]} 

143 else: 

144 action_data = {"value": core_action_data[ATTR_MESSAGE]} 

145 elif equiv_domain == "switch": 

146 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

147 elif equiv_domain == "mqtt": 

148 action_data = data 

149 if "payload" not in action_data: 

150 action_data["payload"] = envelope.message 

151 # add `payload:` with empty value for empty topic 

152 elif equiv_domain == "tts": 

153 action_data = core_action_data 

154 action_data.update(data) 

155 build_targets = True 

156 elif qualified_action == "ntfy.publish": 

157 mini_envelopes.extend(ntfy(qualified_action, core_action_data, data, envelope.target, envelope.delivery)) 

158 elif equiv_domain in ("siren", "light"): 

159 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

160 action_data = data 

161 elif equiv_domain == "rest_command": 

162 action_data = data 

163 elif equiv_domain == "script": 

164 mini_envelopes.extend(script(qualified_action, core_action_data, data, envelope.target, envelope.delivery)) 

165 else: 

166 action_data = core_action_data 

167 action_data.update(data) 

168 build_targets = True 

169 

170 if mini_envelopes: 

171 results: list[bool] = [ 

172 await self.call_action( 

173 envelope, qualified_action, action_data=mini_envelope.action_data, target_data=mini_envelope.target_data 

174 ) 

175 for mini_envelope in mini_envelopes 

176 ] 

177 return all(results) 

178 

179 if build_targets: 

180 all_targets: list[str] = [] 

181 if OPTION_TARGET_CATEGORIES in envelope.delivery.options: 

182 for category in ensure_list(envelope.delivery.options.get(OPTION_TARGET_CATEGORIES, [])): 

183 all_targets.extend(envelope.target.for_category(category)) 

184 else: 

185 all_targets = envelope.target.resolved_targets() 

186 if len(all_targets) == 1: 

187 action_data[ATTR_TARGET] = all_targets[0] 

188 elif len(all_targets) >= 1: 

189 action_data[ATTR_TARGET] = all_targets 

190 

191 if prune_data and action_data: 

192 action_data = customize_data(action_data, domain if not raw_mode else None, envelope.delivery) 

193 

194 return await self.call_action(envelope, qualified_action, action_data=action_data, target_data=target_data or None) 

195 

196 

197def customize_data(data: dict[str, Any], domain: str | None, delivery: Delivery) -> dict[str, Any]: 

198 if not data: 

199 return data 

200 if delivery.options.get(OPTION_DATA_KEYS_SELECT): 

201 selection_rule: SelectionRule | None = SelectionRule(delivery.options.get(OPTION_DATA_KEYS_SELECT)) 

202 else: 

203 selection_rule = None 

204 

205 if selection_rule is None and domain and domain in DATA_FIELDS_ALLOWED: 

206 selection_rule = SelectionRule({SELECT_INCLUDE: DATA_FIELDS_ALLOWED[domain]}) 

207 

208 if selection_rule is None and ATTR_DATA not in data: 

209 return data 

210 pruned: dict[str, Any] = {} 

211 for key in data: 

212 if selection_rule is None or selection_rule.match(key): 

213 pruned[key] = data[key] 

214 if ATTR_DATA in pruned and not pruned[ATTR_DATA]: 

215 del pruned[ATTR_DATA] 

216 return pruned 

217 

218 

219@dataclass 

220class MiniEnvelope: 

221 action_data: dict[str, Any] = field(default_factory=dict) 

222 target_data: dict[str, Any] | None = None 

223 

224 

225def script( 

226 qualified_action: str | None, core_action_data: dict[str, Any], data: dict[str, Any], target: Target, delivery: Delivery 

227) -> list[MiniEnvelope]: 

228 """Customize `data` for script integration""" 

229 results: list[MiniEnvelope] = [] 

230 if qualified_action in ("script.turn_on", "script.turn_off"): 

231 action_data = {} 

232 action_data["variables"] = core_action_data 

233 if "variables" in data: 

234 action_data["variables"].update(data.pop("variables")) 

235 action_data["variables"].update(data) 

236 customize_data(action_data, "script", delivery) 

237 results.append(MiniEnvelope(action_data=action_data, target_data={ATTR_ENTITY_ID: target.domain_entity_ids("script")})) 

238 else: 

239 action_data = core_action_data 

240 action_data.update(data) 

241 results.append(MiniEnvelope(action_data=action_data)) 

242 

243 return results 

244 

245 

246def ntfy( 

247 qualified_action: str | None, # noqa: ARG001 

248 core_action_data: dict[str, Any], 

249 data: dict[str, Any], 

250 target: Target, 

251 delivery: Delivery, 

252) -> list[MiniEnvelope]: 

253 """Customize `data` for ntfy integration""" 

254 results: list[MiniEnvelope] = [] 

255 action_data: dict[str, Any] = dict(core_action_data) 

256 action_data.update(data) 

257 customize_data(action_data, "ntfy", delivery) 

258 

259 if ATTR_PRIORITY in action_data and action_data[ATTR_PRIORITY] in PRIORITY_VALUES: 

260 action_data[ATTR_PRIORITY] = PRIORITY_VALUES.get(action_data[ATTR_PRIORITY], 3) 

261 

262 if action_data.get(ATTR_MEDIA_SNAPSHOT_URL) and "attach" not in action_data: 

263 action_data["attach"] = action_data.get(ATTR_MEDIA_SNAPSHOT_URL) 

264 if action_data.get(ATTR_ACTION_URL) and "click" not in action_data: 

265 action_data["click"] = action_data.get(ATTR_ACTION_URL) 

266 

267 if target.email and "email" not in action_data: 

268 for email in target.email: 

269 call_data: dict[str, Any] = dict(action_data) 

270 if len(results) == 1 and len(target.email) == 1: 

271 results[0].action_data["email"] = email 

272 else: 

273 call_data["email"] = email 

274 results.append(MiniEnvelope(action_data=call_data)) 

275 if target.phone and "call" not in action_data: 

276 for phone in target.phone: 

277 call_data = dict(action_data) 

278 if len(results) == 1 and len(target.phone) == 1: 

279 results[0].action_data["call"] = phone 

280 else: 

281 call_data["call"] = phone 

282 results.append(MiniEnvelope(action_data=call_data)) 

283 notify_entities = target.domain_entity_ids(NOTIFY_DOMAIN) 

284 if not results or notify_entities: 

285 if len(results) == 1: 

286 results[0].target_data = {"entity_id": notify_entities} 

287 else: 

288 results.append(MiniEnvelope(action_data=dict(action_data), target_data={"entity_id": notify_entities})) 

289 

290 return results