Coverage for custom_components/supernotify/transports/generic.py: 18%

169 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-07 15:35 +0000

1import logging 

2from dataclasses import dataclass, field 

3from typing import Any 

4 

5from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN 

6from homeassistant.components.notify.const import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET 

7 

8# ATTR_VARIABLES from script.const has import issues 

9from homeassistant.const import ATTR_ENTITY_ID 

10 

11from custom_components.supernotify import ( 

12 ATTR_ACTION_URL, 

13 ATTR_MEDIA_SNAPSHOT_URL, 

14 ATTR_PRIORITY, 

15 OPTION_DATA_KEYS_SELECT, 

16 OPTION_GENERIC_DOMAIN_STYLE, 

17 OPTION_MESSAGE_USAGE, 

18 OPTION_RAW, 

19 OPTION_SIMPLIFY_TEXT, 

20 OPTION_STRIP_URLS, 

21 OPTION_TARGET_CATEGORIES, 

22 PRIORITY_VALUES, 

23 SELECT_INCLUDE, 

24 TRANSPORT_GENERIC, 

25) 

26from custom_components.supernotify.common import ensure_list 

27from custom_components.supernotify.delivery import Delivery 

28from custom_components.supernotify.envelope import Envelope 

29from custom_components.supernotify.model import ( 

30 DebugTrace, 

31 MessageOnlyPolicy, 

32 SelectionRule, 

33 Target, 

34 TargetRequired, 

35 TransportConfig, 

36 TransportFeature, 

37) 

38from custom_components.supernotify.transport import ( 

39 Transport, 

40) 

41 

42_LOGGER = logging.getLogger(__name__) 

43DATA_FIELDS_ALLOWED = { 

44 "light": [ 

45 "transition", 

46 "rgb_color", 

47 "color_temp_kelvin", 

48 "brightness_pct", 

49 "brightness_step_pct", 

50 "effect", 

51 "rgbw_color", 

52 "rgbww_color", 

53 "color_name", 

54 "hs_color", 

55 "xy_color", 

56 "color_temp", 

57 "brightness", 

58 "brightness_step", 

59 "white", 

60 "profile", 

61 "flash", 

62 ], 

63 "siren": ["tone", "duration", "volume_level"], 

64 "mqtt": ["topic", "payload", "evaluate_payload", "qos", "retain"], 

65 "script": ["variables", "wait", "wait_template"], 

66 "ntfy": ["title", "message", "markdown", "tags", "priority", "click", "delay", "attach", "email", "call", "icon"], 

67 "tts": ["cache", "options", "message", "language", "media_player_entity_id", "entity_id", "target"], 

68} 

69 

70 

71class GenericTransport(Transport): 

72 """Call any service, including non-notify ones, like switch.turn_on or mqtt.publish""" 

73 

74 name = TRANSPORT_GENERIC 

75 

76 def __init__(self, *args: Any, **kwargs: Any) -> None: 

77 super().__init__(*args, **kwargs) 

78 

79 @property 

80 def supported_features(self) -> TransportFeature: 

81 return TransportFeature.MESSAGE | TransportFeature.TITLE 

82 

83 @property 

84 def default_config(self) -> TransportConfig: 

85 config = TransportConfig() 

86 config.delivery_defaults.target_required = TargetRequired.OPTIONAL 

87 config.delivery_defaults.options = { 

88 OPTION_SIMPLIFY_TEXT: False, 

89 OPTION_STRIP_URLS: False, 

90 OPTION_RAW: False, 

91 OPTION_MESSAGE_USAGE: MessageOnlyPolicy.STANDARD, 

92 OPTION_DATA_KEYS_SELECT: None, 

93 OPTION_GENERIC_DOMAIN_STYLE: None, 

94 } 

95 return config 

96 

97 def validate_action(self, action: str | None) -> bool: 

98 if action is not None and "." in action: 

99 return True 

100 _LOGGER.warning("SUPERNOTIFY generic transport must have a qualified action name, e.g. notify.foo") 

101 return False 

102 

103 async def deliver(self, envelope: Envelope, debug_trace: DebugTrace | None = None) -> bool: # noqa: ARG002 

104 # inputs 

105 data: dict[str, Any] = envelope.data or {} 

106 core_action_data: dict[str, Any] = envelope.core_action_data(force_message=False) 

107 raw_mode: bool = envelope.delivery.options.get(OPTION_RAW, False) 

108 qualified_action: str | None = envelope.delivery.action 

109 domain: str | None = qualified_action.split(".", 1)[0] if qualified_action and "." in qualified_action else None 

110 equiv_domain: str | None = domain 

111 if envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE): 

112 equiv_domain = envelope.delivery.options.get(OPTION_GENERIC_DOMAIN_STYLE) 

113 _LOGGER.debug("SUPERNOTIFY Handling %s generic message as if it was %s", domain, equiv_domain) 

114 

115 # outputs 

116 action_data: dict[str, Any] = {} 

117 target_data: dict[str, Any] | None = {} 

118 build_targets: bool = False 

119 prune_data: bool = True 

120 mini_envelopes: list[MiniEnvelope] = [] 

121 

122 if raw_mode: 

123 action_data = core_action_data 

124 action_data.update(data) 

125 build_targets = True 

126 elif equiv_domain == "notify": 

127 action_data = core_action_data 

128 if qualified_action == "notify.send_message": 

129 # amongst the wild west of notifty handling, at least care for the modern core one 

130 action_data = core_action_data 

131 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

132 prune_data = False 

133 else: 

134 action_data = core_action_data 

135 action_data[ATTR_DATA] = data 

136 build_targets = True 

137 elif equiv_domain == "input_text": 

138 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

139 if "value" in data: 

140 action_data = {"value": data["value"]} 

141 else: 

142 action_data = {"value": core_action_data[ATTR_MESSAGE]} 

143 elif equiv_domain == "switch": 

144 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

145 elif equiv_domain == "mqtt": 

146 action_data = data 

147 if "payload" not in action_data: 

148 action_data["payload"] = envelope.message 

149 # add `payload:` with empty value for empty topic 

150 elif equiv_domain == "tts": 

151 action_data = core_action_data 

152 action_data.update(data) 

153 build_targets = True 

154 elif qualified_action == "ntfy.publish": 

155 mini_envelopes.extend(ntfy(qualified_action, core_action_data, data, envelope.target, envelope.delivery)) 

156 elif equiv_domain in ("siren", "light"): 

157 target_data = {ATTR_ENTITY_ID: envelope.target.domain_entity_ids(domain)} 

158 action_data = data 

159 elif equiv_domain == "rest_command": 

160 action_data = data 

161 elif equiv_domain == "script": 

162 mini_envelopes.extend(script(qualified_action, core_action_data, data, envelope.target, envelope.delivery)) 

163 else: 

164 action_data = core_action_data 

165 action_data.update(data) 

166 build_targets = True 

167 

168 if mini_envelopes: 

169 results: list[bool] = [ 

170 await self.call_action( 

171 envelope, qualified_action, action_data=mini_envelope.action_data, target_data=mini_envelope.target_data 

172 ) 

173 for mini_envelope in mini_envelopes 

174 ] 

175 return all(results) 

176 

177 if build_targets: 

178 all_targets: list[str] = [] 

179 if OPTION_TARGET_CATEGORIES in envelope.delivery.options: 

180 for category in ensure_list(envelope.delivery.options.get(OPTION_TARGET_CATEGORIES, [])): 

181 all_targets.extend(envelope.target.for_category(category)) 

182 else: 

183 all_targets = envelope.target.resolved_targets() 

184 if len(all_targets) == 1: 

185 action_data[ATTR_TARGET] = all_targets[0] 

186 elif len(all_targets) >= 1: 

187 action_data[ATTR_TARGET] = all_targets 

188 

189 if prune_data and action_data: 

190 action_data = customize_data(action_data, domain if not raw_mode else None, envelope.delivery) 

191 

192 return await self.call_action(envelope, qualified_action, action_data=action_data, target_data=target_data or None) 

193 

194 

195def customize_data(data: dict[str, Any], domain: str | None, delivery: Delivery) -> dict[str, Any]: 

196 if not data: 

197 return data 

198 if delivery.options.get(OPTION_DATA_KEYS_SELECT): 

199 selection_rule: SelectionRule | None = SelectionRule(delivery.options.get(OPTION_DATA_KEYS_SELECT)) 

200 else: 

201 selection_rule = None 

202 

203 if selection_rule is None and domain and domain in DATA_FIELDS_ALLOWED: 

204 selection_rule = SelectionRule({SELECT_INCLUDE: DATA_FIELDS_ALLOWED[domain]}) 

205 

206 if selection_rule is None and ATTR_DATA not in data: 

207 return data 

208 pruned: dict[str, Any] = {} 

209 for key in data: 

210 if selection_rule is None or selection_rule.match(key): 

211 pruned[key] = data[key] 

212 if ATTR_DATA in pruned and not pruned[ATTR_DATA]: 

213 del pruned[ATTR_DATA] 

214 return pruned 

215 

216 

217@dataclass 

218class MiniEnvelope: 

219 action_data: dict[str, Any] = field(default_factory=dict) 

220 target_data: dict[str, Any] | None = None 

221 

222 

223def script( 

224 qualified_action: str | None, core_action_data: dict[str, Any], data: dict[str, Any], target: Target, delivery: Delivery 

225) -> list[MiniEnvelope]: 

226 """Customize `data` for script integration""" 

227 results: list[MiniEnvelope] = [] 

228 if qualified_action in ("script.turn_on", "script.turn_off"): 

229 action_data = {} 

230 action_data["variables"] = core_action_data 

231 if "variables" in data: 

232 action_data["variables"].update(data.pop("variables")) 

233 action_data["variables"].update(data) 

234 customize_data(action_data, "script", delivery) 

235 results.append(MiniEnvelope(action_data=action_data, target_data={ATTR_ENTITY_ID: target.domain_entity_ids("script")})) 

236 else: 

237 action_data = core_action_data 

238 action_data.update(data) 

239 results.append(MiniEnvelope(action_data=action_data)) 

240 

241 return results 

242 

243 

244def ntfy( 

245 qualified_action: str | None, # noqa: ARG001 

246 core_action_data: dict[str, Any], 

247 data: dict[str, Any], 

248 target: Target, 

249 delivery: Delivery, 

250) -> list[MiniEnvelope]: 

251 """Customize `data` for ntfy integration""" 

252 results: list[MiniEnvelope] = [] 

253 action_data: dict[str, Any] = dict(core_action_data) 

254 action_data.update(data) 

255 customize_data(action_data, "ntfy", delivery) 

256 

257 if ATTR_PRIORITY in action_data and action_data[ATTR_PRIORITY] in PRIORITY_VALUES: 

258 action_data[ATTR_PRIORITY] = PRIORITY_VALUES.get(action_data[ATTR_PRIORITY], 3) 

259 

260 if action_data.get(ATTR_MEDIA_SNAPSHOT_URL) and "attach" not in action_data: 

261 action_data["attach"] = action_data.get(ATTR_MEDIA_SNAPSHOT_URL) 

262 if action_data.get(ATTR_ACTION_URL) and "click" not in action_data: 

263 action_data["click"] = action_data.get(ATTR_ACTION_URL) 

264 

265 if target.email and "email" not in action_data: 

266 for email in target.email: 

267 call_data: dict[str, Any] = dict(action_data) 

268 if len(results) == 1 and len(target.email) == 1: 

269 results[0].action_data["email"] = email 

270 else: 

271 call_data["email"] = email 

272 results.append(MiniEnvelope(action_data=call_data)) 

273 if target.phone and "call" not in action_data: 

274 for phone in target.phone: 

275 call_data = dict(action_data) 

276 if len(results) == 1 and len(target.phone) == 1: 

277 results[0].action_data["call"] = phone 

278 else: 

279 call_data["call"] = phone 

280 results.append(MiniEnvelope(action_data=call_data)) 

281 notify_entities = target.domain_entity_ids(NOTIFY_DOMAIN) 

282 if not results or notify_entities: 

283 if len(results) == 1: 

284 results[0].target_data = {"entity_id": notify_entities} 

285 else: 

286 results.append(MiniEnvelope(action_data=dict(action_data), target_data={"entity_id": notify_entities})) 

287 

288 return results