Coverage for custom_components/supernotify/envelope.py: 89%

188 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-01 15:06 +0000

1import copy 

2import logging 

3import string 

4import time 

5import typing 

6import uuid 

7from typing import Any, cast 

8 

9from homeassistant.components.notify.const import ATTR_MESSAGE, ATTR_TITLE 

10from homeassistant.helpers.template import is_template_string 

11from jinja2 import TemplateError 

12 

13from .common import DupeCheckable 

14from .const import ( 

15 ATTR_MEDIA, 

16 ATTR_MESSAGE_HTML, 

17 ATTR_PRIORITY, 

18 ATTR_SPOKEN_MESSAGE, 

19 ATTR_TIMESTAMP, 

20 OPTION_MESSAGE_USAGE, 

21 OPTION_SIMPLIFY_TEXT, 

22 OPTION_STRIP_URLS, 

23 PRIORITY_MEDIUM, 

24) 

25from .media_grab import grab_image 

26from .model import ( 

27 ConditionVariables, 

28 DeliveryCustomization, 

29 MessageOnlyPolicy, 

30 SuppressionReason, 

31 Target, 

32 TargetRequired, 

33 TransportFeature, 

34) 

35 

36if typing.TYPE_CHECKING: 

37 from anyio import Path 

38 

39 from custom_components.supernotify.common import CallRecord 

40 

41 from .context import Context 

42 from .delivery import Delivery 

43 from .notification import Notification 

44 from .scenario import Scenario 

45 

46_LOGGER = logging.getLogger(__name__) 

47 

48HASH_PREP_TRANSLATION_TABLE = table = str.maketrans("", "", string.punctuation + string.digits) 

49 

50 

51class Envelope(DupeCheckable): 

52 """Wrap a notification with a specific set of targets and service data possibly customized for those targets""" 

53 

54 def __init__( 

55 self, 

56 delivery: Delivery, 

57 notification: Notification | None = None, 

58 target: Target | None = None, # targets only for this delivery 

59 data: dict[str, Any] | None = None, 

60 context: Context | None = None, # notification data customized for this delivery 

61 ) -> None: 

62 self.target: Target = target or Target() 

63 self.context: Context | None = context 

64 self.delivery_name: str = delivery.name 

65 self.delivery: Delivery = delivery 

66 self._notification = notification 

67 self.notification_id = None 

68 self.media = None 

69 self.action_groups = None 

70 self.priority = PRIORITY_MEDIUM 

71 self._message: str | None = None 

72 self._title: str | None = None 

73 self.message_html: str | None = None 

74 self.data: dict[str, Any] = {} 

75 self.actions: list[dict[str, Any]] = [] 

76 if notification: 

77 delivery_config_data: dict[str, Any] = notification.delivery_data(delivery) 

78 self._enabled_scenarios: dict[str, Scenario] = notification.enabled_scenarios 

79 self._message = delivery_config_data.pop(ATTR_MESSAGE, notification.message) 

80 self._title = delivery_config_data.pop(ATTR_TITLE, notification._title) 

81 self.id = f"{notification.id}_{self.delivery_name}" 

82 else: 

83 delivery_config_data = {} 

84 self._enabled_scenarios = {} 

85 self.id = str(uuid.uuid1()) 

86 if data: 

87 self.data = copy.deepcopy(data) 

88 if delivery_config_data: 

89 # notification-level delivery override wins over scenario/delivery data 

90 self.data |= delivery_config_data 

91 else: 

92 self.data = delivery_config_data if delivery_config_data else {} 

93 

94 if notification: 

95 self.notification_id = notification.id 

96 self.media = notification.media 

97 self.action_groups = notification.action_groups 

98 self.actions = notification.actions 

99 self.priority = self.data.get(ATTR_PRIORITY, notification.priority) 

100 self.message_html = self.data.get(ATTR_MESSAGE_HTML, notification.message_html) 

101 if notification and hasattr(notification, "condition_variables"): # yeuchh 

102 self.condition_variables = notification.condition_variables 

103 else: 

104 self.condition_variables = ConditionVariables() 

105 

106 self.message = self._compute_message() 

107 self.title = self._compute_title() 

108 

109 self.delivered: int = 0 

110 self.error_count: int = 0 

111 self.skipped: int = 0 

112 self.skip_reason: SuppressionReason | None = None 

113 self.calls: list[CallRecord] = [] 

114 self.failed_calls: list[CallRecord] = [] 

115 self.delivery_error: list[str] | None = None 

116 

117 async def grab_image(self) -> Path | None: 

118 """Grab an image from a camera, snapshot URL, MQTT Image etc""" 

119 image_path: Path | None = None 

120 if self._notification: 

121 image_path = await grab_image(self._notification, self.delivery, self._notification.context) 

122 return image_path 

123 

124 def core_action_data(self, force_message: bool = True) -> dict[str, Any]: 

125 """Build the core set of `service_data` dict to pass to underlying notify service""" 

126 # TODO: remove all logic, so only called to pre-populate `data` 

127 data: dict[str, Any] = {} 

128 # message is mandatory for notify platform 

129 if self.message is None: 

130 if force_message: 

131 data[ATTR_MESSAGE] = "" 

132 else: 

133 data[ATTR_MESSAGE] = self.message 

134 timestamp = self.data.get(ATTR_TIMESTAMP) 

135 if timestamp and ATTR_MESSAGE in data: 

136 data[ATTR_MESSAGE] = f"{data[ATTR_MESSAGE]} [{time.strftime(timestamp, time.localtime())}]" 

137 if self.title is not None: 

138 data[ATTR_TITLE] = self.title 

139 return data 

140 

141 def contents(self, minimal: bool = True, **_kwargs: Any) -> dict[str, typing.Any]: 

142 exclude_attrs: list[str] = ["_notification", "context", "condition_variables"] 

143 if minimal: 

144 exclude_attrs.append("delivery") 

145 features: TransportFeature = self.delivery.transport.supported_features 

146 if not features & TransportFeature.ACTIONS: 

147 exclude_attrs.extend(["actions", "action_groups"]) 

148 if not features & TransportFeature.IMAGES and not features & TransportFeature.VIDEO: 

149 exclude_attrs.append(ATTR_MEDIA) 

150 if not features & TransportFeature.MESSAGE: 

151 exclude_attrs.extend(["message_html", "message"]) 

152 if not features & TransportFeature.TITLE: 

153 exclude_attrs.append("title") 

154 if self.delivery.target_required == TargetRequired.NEVER: 

155 exclude_attrs.append("target") 

156 

157 json_ready = {k: v for k, v in self.__dict__.items() if k not in exclude_attrs and not k.startswith("_")} 

158 json_ready["data"] = self._resolve_data_templates(self.data) 

159 json_ready["calls"] = [call.contents() for call in self.calls] 

160 json_ready["failedcalls"] = [call.contents() for call in self.failed_calls] 

161 return json_ready 

162 

163 def __eq__(self, other: Any | None) -> bool: 

164 """Specialized equality check for subset of attributes""" 

165 if other is None or not isinstance(other, Envelope): 

166 return False 

167 return bool( 

168 self.target == other.target 

169 and self.delivery_name == other.delivery_name 

170 and self.data == other.data 

171 and self.notification_id == other.notification_id 

172 ) 

173 

174 def __repr__(self) -> str: 

175 """Return a concise string representation of the Envelope. 

176 

177 The returned string includes the envelope's message, title, and delivery name 

178 in the form: Envelope(message=<message>,title=<title>,delivery=<delivery_name>). 

179 

180 Primarily intended for debugging and logging; note that attribute values are 

181 inserted directly and may not be quoted or escaped. 

182 """ 

183 return f"Envelope(message={self.message},title={self.title},delivery={self.delivery_name})" 

184 

185 def _compute_title(self, ignore_usage: bool = False) -> str | None: 

186 # message and title reverse the usual defaulting, delivery config overrides runtime call 

187 

188 title: str | None = None 

189 message_usage = self.delivery.option_str(OPTION_MESSAGE_USAGE) 

190 if not ignore_usage and message_usage.upper() in (MessageOnlyPolicy.USE_TITLE, MessageOnlyPolicy.COMBINE_TITLE): 

191 title = None 

192 else: 

193 title = self.delivery.title if self.delivery.title is not None else self._title 

194 if self.delivery.option_bool(OPTION_SIMPLIFY_TEXT) is True or self.delivery.option_bool(OPTION_STRIP_URLS) is True: 

195 title = self.delivery.transport.simplify(title, strip_urls=self.delivery.option_bool(OPTION_STRIP_URLS)) 

196 title = self._render_scenario_templates(title, "title_template", "notification_title") 

197 if title is None: 

198 return None 

199 return str(title) 

200 

201 def _spoken_message(self) -> str | None: 

202 """Alternative message only for spoken voice transports""" 

203 if ( 

204 self._notification 

205 and self._notification.extra_data 

206 and ATTR_SPOKEN_MESSAGE in self._notification.extra_data 

207 and self.delivery.transport.supported_features & TransportFeature.SPOKEN 

208 ): 

209 return str(self._notification.extra_data[ATTR_SPOKEN_MESSAGE]) 

210 return None 

211 

212 def _compute_message(self) -> str | None: 

213 # message and title reverse the usual defaulting, delivery config overrides runtime call 

214 

215 # self._message could be top level `message` or `message` set in delivery override 

216 msg: str | None = self.delivery.message if self.delivery.message is not None else self._message 

217 msg = self._spoken_message() or msg 

218 

219 if msg and self.context and is_template_string(msg): 

220 try: 

221 context_vars = cast("dict[str,Any]", self.condition_variables.as_dict()) if self.condition_variables else {} 

222 template = self.context.hass_api.template(msg) 

223 msg = template.async_render(variables=context_vars) 

224 except Exception as e: 

225 _LOGGER.warning("SUPERNOTIFY Rendering delivery message template for %s failed: %s", self.delivery_name, e) 

226 

227 message_usage: str = str(self.delivery.option_str(OPTION_MESSAGE_USAGE)) 

228 if message_usage.upper() == MessageOnlyPolicy.USE_TITLE: 

229 title = self._compute_title(ignore_usage=True) 

230 if title: 

231 msg = title 

232 elif message_usage.upper() == MessageOnlyPolicy.COMBINE_TITLE: 

233 title = self._compute_title(ignore_usage=True) 

234 if title: 

235 msg = f"{title} {msg}" 

236 

237 if self.delivery.option_bool(OPTION_SIMPLIFY_TEXT) is True or self.delivery.option_bool(OPTION_STRIP_URLS) is True: 

238 msg = self.delivery.transport.simplify(msg, strip_urls=self.delivery.option_bool(OPTION_STRIP_URLS)) 

239 

240 msg = self._render_scenario_templates(msg, "message_template", "notification_message") 

241 if msg is None: # keep mypy happy 

242 return None 

243 return str(msg) 

244 

245 def _render_scenario_templates(self, original: str | None, template_field: str, matching_ctx: str) -> str | None: 

246 """Apply templating to a field, like message or title""" 

247 rendered = original if original is not None else "" 

248 delivery_configs: list[DeliveryCustomization] = list( 

249 filter(None, (scenario.delivery_config(self.delivery.name) for scenario in self._enabled_scenarios.values())) 

250 ) 

251 template_formats: list[str] = [ 

252 dc.data_value(template_field) 

253 for dc in delivery_configs 

254 if dc is not None and dc.data_value(template_field) is not None 

255 ] 

256 if template_formats and self.context: 

257 if self.condition_variables: 

258 context_vars: dict[str, Any] = cast("dict[str,Any]", self.condition_variables.as_dict()) 

259 else: 

260 context_vars = {} 

261 for template_format in template_formats: 

262 context_vars[matching_ctx] = rendered 

263 try: 

264 template = self.context.hass_api.template(template_format) 

265 rendered = template.async_render(variables=context_vars) 

266 except TemplateError as e: 

267 self.error_count += 1 

268 _LOGGER.warning( 

269 "SUPERNOTIFY Rendering template %s for %s failed: %s", template_field, self.delivery.name, e 

270 ) 

271 return rendered 

272 return original 

273 

274 # DupeCheckable implementation 

275 

276 def hash(self) -> int: 

277 """Alpha hash to reduce noise from messages with timestamps or incrementing counts""" 

278 

279 def alphaize(v: str | None) -> str | None: 

280 return v.translate(HASH_PREP_TRANSLATION_TABLE) if v else v 

281 

282 message: str | None = self._spoken_message() or self._message 

283 return hash((alphaize(message), alphaize(self.delivery.name), self.target.hash_resolved(), alphaize(self._title))) 

284 

285 def _resolve_data_templates(self, data: dict[str, Any]) -> dict[str, Any]: 

286 """Resolve Jinja2 templates in data dict for archive readability. 

287 

288 Returns a copy of data with template strings replaced by their 

289 resolved values. Raw template string is preserved alongside as 

290 <key>_template for debugging. Non-template values are unchanged. 

291 """ 

292 if not data or not self.context: 

293 return data 

294 resolved: dict[str, Any] = {} 

295 context_vars = cast("dict[str, Any]", self.condition_variables.as_dict()) if self.condition_variables else {} 

296 for key, value in data.items(): 

297 if isinstance(value, str) and "{{" in value: 

298 try: 

299 rendered = self.context.hass_api.template(value).async_render(variables=context_vars) 

300 resolved[key] = rendered 

301 resolved[f"{key}_template"] = value 

302 except Exception as e: 

303 _LOGGER.debug("SUPERNOTIFY Could not resolve template for %s in %s: %s", key, self.delivery_name, e) 

304 resolved[key] = value 

305 else: 

306 resolved[key] = value 

307 return resolved