Coverage for custom_components/supernotify/transports/generic.py: 97%

208 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-01 15:06 +0000

1import logging 

2from dataclasses import dataclass, field 

3from typing import TYPE_CHECKING, Any 

4 

5from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN 

6from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET, ATTR_TITLE 

7 

8# ATTR_VARIABLES from script.const has import issues 

9from homeassistant.const import ATTR_ENTITY_ID 

10 

11from custom_components.supernotify.common import ensure_list 

12from custom_components.supernotify.const import ( 

13 ATTR_ACTION_URL, 

14 ATTR_ACTIONS, 

15 ATTR_MEDIA, 

16 ATTR_MEDIA_SNAPSHOT_URL, 

17 ATTR_PRIORITY, 

18 OPTION_DATA_KEYS_SELECT, 

19 OPTION_GENERIC_DOMAIN_STYLE, 

20 OPTION_MESSAGE_USAGE, 

21 OPTION_RAW, 

22 OPTION_SIMPLIFY_TEXT, 

23 OPTION_STRIP_URLS, 

24 OPTION_TARGET_CATEGORIES, 

25 PRIORITY_CRITICAL, 

26 PRIORITY_HIGH, 

27 PRIORITY_LOW, 

28 PRIORITY_MEDIUM, 

29 PRIORITY_MINIMUM, 

30 PRIORITY_VALUES, 

31 TRANSPORT_GENERIC, 

32) 

33from custom_components.supernotify.model import ( 

34 DebugTrace, 

35 MessageOnlyPolicy, 

36 SelectionRule, 

37 Target, 

38 TargetRequired, 

39 TransportConfig, 

40 TransportFeature, 

41) 

42from custom_components.supernotify.transport import ( 

43 Transport, 

44) 

45 

46if TYPE_CHECKING: 

47 from custom_components.supernotify.delivery import Delivery 

48 from custom_components.supernotify.envelope import Envelope 

49 from custom_components.supernotify.hass_api import HomeAssistantAPI 

50 

51_LOGGER = logging.getLogger(__name__) 

52""" 

53Replaced by reuse of original service schema to prune out fields 

54 

55DATA_FIELDS_ALLOWED_BY_DOMAIN = { 

56 "light": [ 

57 "transition", 

58 "rgb_color", 

59 "color_temp_kelvin", 

60 "brightness_pct", 

61 "brightness_step_pct", 

62 "effect", 

63 "rgbw_color", 

64 "rgbww_color", 

65 "color_name", 

66 "hs_color", 

67 "xy_color", 

68 "color_temp", 

69 "brightness", 

70 "brightness_step", 

71 "white", 

72 "profile", 

73 "flash", 

74 ], 

75 "siren": ["tone", "duration", "volume_level"], 

76 "mqtt": ["topic", "payload", "evaluate_payload", "qos", "retain"], 

77 "script": ["variables", "wait", "wait_template"], 

78 "ntfy": [ 

79 "title", 

80 "message", 

81 "markdown", 

82 "tags", 

83 "priority", 

84 "click", 

85 "delay", 

86 "attach", 

87 "attach_file", 

88 "filename", 

89 "email", 

90 "call", 

91 "icon", 

92 "action", 

93 "sequence_id", 

94 ], 

95 "tts": ["cache", "options", "message", "language", "media_player_entity_id", "entity_id", "target"], 

96} """ 

97 

98 

99class GenericTransport(Transport): 

100 """Call any service, including non-notify ones, like switch.turn_on or mqtt.publish""" 

101 

102 name = TRANSPORT_GENERIC 

103 

104 def __init__(self, *args: Any, **kwargs: Any) -> None: 

105 super().__init__(*args, **kwargs) 

106 

107 @property 

108 def supported_features(self) -> TransportFeature: 

109 return TransportFeature.MESSAGE | TransportFeature.TITLE 

110 

111 @property 

112 def default_config(self) -> TransportConfig: 

113 config = TransportConfig() 

114 config.delivery_defaults.target_required = TargetRequired.OPTIONAL 

115 config.delivery_defaults.options = { 

116 OPTION_SIMPLIFY_TEXT: False, 

117 OPTION_STRIP_URLS: False, 

118 OPTION_RAW: False, 

119 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD, 

120 OPTION_DATA_KEYS_SELECT: None, 

121 OPTION_GENERIC_DOMAIN_STYLE: None, 

122 } 

123 return config 

124 

125 def validate_action(self, action: str | None) -> bool: 

126 if action is not None and "." in action: 

127 return True 

128 _LOGGER.warning("SUPERNOTIFY generic transport must have a qualified action name, e.g. notify.foo") 

129 return False 

130 

131 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: # noqa: ARG002 

132 # inputs 

133 data: dict[str, Any] = envelope.data or {} 

134 core_action_data: dict[str, Any] = envelope.core_action_data(force_message=False) 

135 raw_mode: bool = envelope.delivery.options.get(OPTION_RAW, False) 

136 qualified_action: str | None = envelope.delivery.action 

137 split_action = ( 

138 qualified_action.split(".", 1) if qualified_action and "." in qualified_action else [None, qualified_action] 

139 ) 

140 domain: str | None = split_action[0] 

141 service: str | None = split_action[1] 

142 

143 equiv_domain: str | None = domain 

144 if envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE): 

145 equiv_domain = envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE) 

146 _LOGGER.debug("SUPERNOTIFY Handling %s generic message as if it was %s", domain, equiv_domain) 

147 

148 # outputs 

149 action_data: dict[str, Any] = {} 

150 target_data: dict[str, Any] | None = {} 

151 build_targets: bool = False 

152 prune_data: bool = True 

153 mini_envelopes: list[MiniEnvelope] = [] # only used for script and ntfy 

154 

155 if raw_mode: 

156 action_data = core_action_data 

157 action_data.update(data) 

158 build_targets = True 

159 elif equiv_domain == "notify": 

160 action_data = core_action_data 

161 if qualified_action == "notify.send_message": 

162 # amongst the wild west of notifty handling, at least care for the modern core one 

163 action_data = core_action_data 

164 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

165 prune_data = False 

166 else: 

167 action_data = core_action_data 

168 action_data[ATTR_DATA] = data 

169 build_targets = True 

170 elif equiv_domain == "input_text": 

171 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

172 if "value" in data: 

173 action_data = {"value": data["value"]} 

174 else: 

175 action_data = {"value": core_action_data[ATTR_MESSAGE]} 

176 elif equiv_domain == "switch": 

177 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

178 elif equiv_domain == "mqtt": 

179 action_data = data 

180 if "payload" not in action_data: 

181 action_data["payload"] = envelope.message 

182 # add `payload:` with empty value for empty topic 

183 elif equiv_domain == "tts": 

184 action_data = core_action_data 

185 action_data.update(data) 

186 build_targets = True 

187 elif equiv_domain == "notify_events": 

188 mini_envelopes.extend(notify_events(envelope.message, envelope.title, core_action_data, data, envelope.delivery)) 

189 elif qualified_action == "ntfy.publish": 

190 mini_envelopes.extend(ntfy(core_action_data, data, envelope.target, envelope.delivery, self.hass_api)) 

191 elif equiv_domain in ("siren", "light"): 

192 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

193 action_data = data 

194 elif equiv_domain == "rest_command": 

195 action_data = data 

196 elif equiv_domain == "script": 

197 mini_envelopes.extend( 

198 script(qualified_action, core_action_data, data, envelope.target, envelope.delivery, self.hass_api) 

199 ) 

200 else: 

201 action_data = core_action_data 

202 action_data.update(data) 

203 build_targets = True 

204 

205 if mini_envelopes: 

206 results: list[bool] = [ 

207 await self.call_action( 

208 envelope, qualified_action, action_data=mini_envelope.action_data, target_data=mini_envelope.target_data 

209 ) 

210 for mini_envelope in mini_envelopes 

211 ] 

212 return all(results) 

213 

214 if build_targets: 

215 all_targets: list[str] = [] 

216 if OPTION_TARGET_CATEGORIES in envelope.delivery.options: 

217 for category in ensure_list(envelope.delivery.options.get(OPTION_TARGET_CATEGORIES, [])): 

218 all_targets.extend(envelope.target.for_category(category)) 

219 else: 

220 all_targets = envelope.target.resolved_targets() 

221 if len(all_targets) == 1: 

222 action_data[ATTR_TARGET] = all_targets[0] 

223 elif len(all_targets) >= 1: 

224 action_data[ATTR_TARGET] = all_targets 

225 

226 if prune_data and action_data: 

227 action_data = customize_data(action_data, envelope.delivery) 

228 if not raw_mode and domain and service: 

229 # use the service schema to remove unsupported fields or force type 

230 action_data = self.context.hass_api.coerce_schema(domain, service, action_data) 

231 

232 return await self.call_action(envelope, qualified_action, action_data=action_data, target_data=target_data or None) 

233 

234 

235def customize_data(data: dict[str, Any], delivery: Delivery) -> dict[str, Any]: 

236 if not data: 

237 return data 

238 top_selection_rule: SelectionRule | None = None 

239 if delivery.options.get(OPTION_DATA_KEYS_SELECT): 

240 top_selection_rule = SelectionRule(delivery.options.get(OPTION_DATA_KEYS_SELECT)) 

241 if top_selection_rule is None: 

242 pruned: dict[str, Any] = data 

243 else: 

244 pruned = {} 

245 for key in data: 

246 if top_selection_rule is None or top_selection_rule.match(key): 

247 pruned[key] = data[key] 

248 if ATTR_DATA in pruned and not pruned[ATTR_DATA]: 

249 # tidy up empty nested `data` maps 

250 del pruned[ATTR_DATA] 

251 

252 return pruned 

253 

254 

255@dataclass 

256class MiniEnvelope: 

257 action_data: dict[str, Any] = field(default_factory=dict) 

258 target_data: dict[str, Any] | None = None 

259 

260 

261def script( 

262 qualified_action: str | None, 

263 core_action_data: dict[str, Any], 

264 data: dict[str, Any], 

265 target: Target, 

266 delivery: Delivery, 

267 hass_api: HomeAssistantAPI, 

268) -> list[MiniEnvelope]: 

269 """Customize `data` for script integration""" 

270 results: list[MiniEnvelope] = [] 

271 if qualified_action in ("script.turn_on", "script.turn_off"): 

272 action_data = {} 

273 action_data["variables"] = core_action_data 

274 if "variables" in data: 

275 action_data["variables"].update(data.pop("variables")) 

276 action_data["variables"].update(data) 

277 customize_data(action_data, delivery) 

278 action_data = hass_api.coerce_schema("script", qualified_action.replace("script.", ""), action_data) 

279 results.append(MiniEnvelope(action_data=action_data, target_data={ATTR_ENTITY_ID: target.domain_entity_ids("script")})) 

280 else: 

281 action_data = core_action_data 

282 action_data.update(data) 

283 results.append(MiniEnvelope(action_data=action_data)) 

284 

285 return results 

286 

287 

288def ntfy( 

289 core_action_data: dict[str, Any], 

290 data: dict[str, Any], 

291 target: Target, 

292 delivery: Delivery, 

293 hass_api: HomeAssistantAPI, 

294) -> list[MiniEnvelope]: 

295 """Customize `data` for ntfy integration""" 

296 results: list[MiniEnvelope] = [] 

297 action_data: dict[str, Any] = dict(core_action_data) 

298 action_data.update(data) 

299 customize_data(action_data, delivery) 

300 action_data = hass_api.coerce_schema("ntfy", "publish", action_data) 

301 

302 if ATTR_PRIORITY in action_data and action_data[ATTR_PRIORITY] in PRIORITY_VALUES: 

303 action_data[ATTR_PRIORITY] = PRIORITY_VALUES.get(action_data[ATTR_PRIORITY], 3) 

304 

305 media = action_data.pop(ATTR_MEDIA, {}) 

306 if media and media.get(ATTR_MEDIA_SNAPSHOT_URL) and "attach" not in action_data: 

307 action_data["attach"] = media.get(ATTR_MEDIA_SNAPSHOT_URL) 

308 actions = action_data.pop(ATTR_ACTIONS, []) 

309 if len(actions) > 0: 

310 first_action = actions[0] 

311 if first_action.get(ATTR_ACTION_URL) and "click" not in action_data: 

312 action_data["click"] = first_action.get(ATTR_ACTION_URL) 

313 

314 if target.email and "email" not in action_data: 

315 for email in target.email: 

316 call_data: dict[str, Any] = dict(action_data) 

317 if len(results) == 1 and len(target.email) == 1: 

318 results[0].action_data["email"] = email 

319 else: 

320 call_data["email"] = email 

321 results.append(MiniEnvelope(action_data=call_data)) 

322 if target.phone and "call" not in action_data: 

323 for phone in target.phone: 

324 call_data = dict(action_data) 

325 if len(results) == 1 and len(target.phone) == 1: 

326 results[0].action_data["call"] = phone 

327 else: 

328 call_data["call"] = phone 

329 results.append(MiniEnvelope(action_data=call_data)) 

330 notify_entities = target.domain_entity_ids(NOTIFY_DOMAIN) 

331 if not results or notify_entities: 

332 if len(results) == 1: 

333 results[0].target_data = {"entity_id": notify_entities} 

334 else: 

335 results.append(MiniEnvelope(action_data=dict(action_data), target_data={"entity_id": notify_entities})) 

336 

337 return results 

338 

339 

340def notify_events( 

341 message: str | None, 

342 title: str | None, 

343 core_action_data: dict[str, Any], 

344 data: dict[str, Any], 

345 delivery: Delivery, 

346) -> list[MiniEnvelope]: 

347 """Customize `data` for notify_events integration""" 

348 results: list[MiniEnvelope] = [] 

349 input_data: dict[str, Any] = dict(core_action_data) 

350 input_data.update(data) 

351 customize_data(input_data, delivery) 

352 action_data: dict[str, Any] = {} 

353 action_data[ATTR_MESSAGE] = message 

354 priority_mapping: dict[str, str] = { 

355 PRIORITY_MINIMUM: "lowest", 

356 PRIORITY_LOW: "low", 

357 PRIORITY_MEDIUM: "normal", 

358 PRIORITY_HIGH: "high", 

359 PRIORITY_CRITICAL: "highest", 

360 } 

361 if title: 

362 action_data.setdefault(ATTR_DATA, {}) 

363 action_data[ATTR_DATA][ATTR_TITLE] = title 

364 

365 if ATTR_DATA in input_data: 

366 # notify_events is schema-less for action 

367 action_data[ATTR_DATA] = input_data[ATTR_DATA] 

368 

369 if ATTR_PRIORITY in input_data and input_data[ATTR_PRIORITY] in PRIORITY_VALUES: 

370 action_data.setdefault(ATTR_DATA, {}) 

371 action_data[ATTR_DATA][ATTR_PRIORITY] = priority_mapping.get(input_data[ATTR_PRIORITY]) 

372 elif ATTR_PRIORITY in input_data and input_data[ATTR_PRIORITY] in priority_mapping.values(): 

373 action_data.setdefault(ATTR_DATA, {}) 

374 action_data[ATTR_DATA][ATTR_PRIORITY] = input_data[ATTR_PRIORITY] 

375 

376 if "token" in input_data: 

377 action_data.setdefault(ATTR_DATA, {}) 

378 action_data[ATTR_DATA]["token"] = input_data["token"] 

379 if "level" in input_data: 

380 action_data.setdefault(ATTR_DATA, {}) 

381 action_data[ATTR_DATA]["level"] = input_data["level"] 

382 

383 if input_data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL) and "images" not in input_data: 

384 action_data.setdefault(ATTR_DATA, {}) 

385 action_data[ATTR_DATA].setdefault("images", []) 

386 action_data[ATTR_DATA]["images"].append({"url": input_data.get(ATTR_MEDIA, {}).get(ATTR_MEDIA_SNAPSHOT_URL)}) 

387 

388 results.append(MiniEnvelope(action_data=dict(action_data))) 

389 

390 return results