Coverage for custom_components/supernotify/envelope.py: 17%

163 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-07 15:35 +0000

1import copy 

2import logging 

3import string 

4import time 

5import typing 

6import uuid 

7from typing import Any, cast 

8 

9from anyio import Path 

10from homeassistant.components.notify.const import ATTR_MESSAGE, ATTR_TITLE 

11from jinja2 import TemplateError 

12 

13from . import ( 

14 ATTR_MEDIA, 

15 ATTR_MESSAGE_HTML, 

16 ATTR_PRIORITY, 

17 ATTR_TIMESTAMP, 

18 OPTION_MESSAGE_USAGE, 

19 OPTION_SIMPLIFY_TEXT, 

20 OPTION_STRIP_URLS, 

21 PRIORITY_MEDIUM, 

22) 

23from .common import DupeCheckable 

24from .context import Context 

25from .media_grab import grab_image 

26from .model import ( 

27 ConditionVariables, 

28 DeliveryCustomization, 

29 MessageOnlyPolicy, 

30 SuppressionReason, 

31 Target, 

32 TargetRequired, 

33 TransportFeature, 

34) 

35 

36if typing.TYPE_CHECKING: 

37 from custom_components.supernotify.common import CallRecord 

38 

39 from .delivery import Delivery 

40 from .notification import Notification 

41 from .scenario import Scenario 

42 

43_LOGGER = logging.getLogger(__name__) 

44 

45HASH_PREP_TRANSLATION_TABLE = table = str.maketrans("", "", string.punctuation + string.digits) 

46 

47 

48class Envelope(DupeCheckable): 

49 """Wrap a notification with a specific set of targets and service data possibly customized for those targets""" 

50 

51 def __init__( 

52 self, 

53 delivery: "Delivery", 

54 notification: "Notification | None" = None, 

55 target: Target | None = None, # targets only for this delivery 

56 data: dict[str, Any] | None = None, 

57 context: Context | None = None, # notification data customized for this delivery 

58 ) -> None: 

59 self.target: Target = target or Target() 

60 self.context: Context | None = context 

61 self.delivery_name: str = delivery.name 

62 self.delivery: Delivery = delivery 

63 self._notification = notification 

64 self.notification_id = None 

65 self.media = None 

66 self.action_groups = None 

67 self.priority = PRIORITY_MEDIUM 

68 self._message: str | None = None 

69 self._title: str | None = None 

70 self.message_html: str | None = None 

71 self.data: dict[str, Any] = {} 

72 self.actions: list[dict[str, Any]] = [] 

73 if notification: 

74 delivery_config_data: dict[str, Any] = notification.delivery_data(delivery.name) 

75 self._enabled_scenarios: dict[str, Scenario] = notification.enabled_scenarios 

76 self._message = notification.message 

77 self._title = notification._title 

78 self.id = f"{notification.id}_{self.delivery_name}" 

79 else: 

80 delivery_config_data = {} 

81 self._enabled_scenarios = {} 

82 self.id = str(uuid.uuid1()) 

83 if data: 

84 self.data = copy.deepcopy(delivery_config_data) if delivery_config_data else {} 

85 self.data |= data 

86 else: 

87 self.data = delivery_config_data if delivery_config_data else {} 

88 

89 if notification: 

90 self.notification_id = notification.id 

91 self.media = notification.media 

92 self.action_groups = notification.action_groups 

93 self.actions = notification.actions 

94 self.priority = self.data.get(ATTR_PRIORITY, notification.priority) 

95 self.message_html = self.data.get(ATTR_MESSAGE_HTML, notification.message_html) 

96 if notification and hasattr(notification, "condition_variables"): # yeuchh 

97 self.condition_variables = notification.condition_variables 

98 else: 

99 self.condition_variables = ConditionVariables() 

100 

101 self.message = self._compute_message() 

102 self.title = self._compute_title() 

103 

104 self.delivered: int = 0 

105 self.error_count: int = 0 

106 self.skipped: int = 0 

107 self.skip_reason: SuppressionReason | None = None 

108 self.calls: list[CallRecord] = [] 

109 self.failed_calls: list[CallRecord] = [] 

110 self.delivery_error: list[str] | None = None 

111 

112 async def grab_image(self) -> Path | None: 

113 """Grab an image from a camera, snapshot URL, MQTT Image etc""" 

114 image_path: Path | None = None 

115 if self._notification: 

116 image_path = await grab_image(self._notification, self.delivery_name, self._notification.context) 

117 return image_path 

118 

119 def core_action_data(self, force_message: bool = True) -> dict[str, Any]: 

120 """Build the core set of `service_data` dict to pass to underlying notify service""" 

121 # TODO: remove all logic, so only called to pre-populate `data` 

122 data: dict[str, Any] = {} 

123 # message is mandatory for notify platform 

124 if self.message is None: 

125 if force_message: 

126 data[ATTR_MESSAGE] = "" 

127 else: 

128 data[ATTR_MESSAGE] = self.message 

129 timestamp = self.data.get(ATTR_TIMESTAMP) 

130 if timestamp and ATTR_MESSAGE in data: 

131 data[ATTR_MESSAGE] = f"{data[ATTR_MESSAGE]} [{time.strftime(timestamp, time.localtime())}]" 

132 if self.title is not None: 

133 data[ATTR_TITLE] = self.title 

134 return data 

135 

136 def contents(self, minimal: bool = True, **_kwargs: Any) -> dict[str, typing.Any]: 

137 exclude_attrs: list[str] = ["_notification", "context"] 

138 if minimal: 

139 exclude_attrs.append("delivery") 

140 features: TransportFeature = self.delivery.transport.supported_features 

141 if not features & TransportFeature.ACTIONS: 

142 exclude_attrs.extend(["actions", "action_groups"]) 

143 if not features & TransportFeature.IMAGES and not features & TransportFeature.VIDEO: 

144 exclude_attrs.append(ATTR_MEDIA) 

145 if not features & TransportFeature.MESSAGE: 

146 exclude_attrs.extend(["message_html", "message"]) 

147 if not features & TransportFeature.TITLE: 

148 exclude_attrs.append("title") 

149 if self.delivery.target_required == TargetRequired.NEVER: 

150 exclude_attrs.append("target") 

151 

152 json_ready = {k: v for k, v in self.__dict__.items() if k not in exclude_attrs and not k.startswith("_")} 

153 json_ready["calls"] = [call.contents() for call in self.calls] 

154 json_ready["failedcalls"] = [call.contents() for call in self.failed_calls] 

155 return json_ready 

156 

157 def __eq__(self, other: Any | None) -> bool: 

158 """Specialized equality check for subset of attributes""" 

159 if other is None or not isinstance(other, Envelope): 

160 return False 

161 return bool( 

162 self.target == other.target 

163 and self.delivery_name == other.delivery_name 

164 and self.data == other.data 

165 and self.notification_id == other.notification_id 

166 ) 

167 

168 def __repr__(self) -> str: 

169 """Return a concise string representation of the Envelope. 

170 

171 The returned string includes the envelope's message, title, and delivery name 

172 in the form: Envelope(message=<message>,title=<title>,delivery=<delivery_name>). 

173 

174 Primarily intended for debugging and logging; note that attribute values are 

175 inserted directly and may not be quoted or escaped. 

176 """ 

177 return f"Envelope(message={self.message},title={self.title},delivery={self.delivery_name})" 

178 

179 def _compute_title(self, ignore_usage: bool = False) -> str | None: 

180 # message and title reverse the usual defaulting, delivery config overrides runtime call 

181 

182 title: str | None = None 

183 if self.delivery is None: 

184 title = self._title 

185 else: 

186 message_usage = self.delivery.option_str(OPTION_MESSAGE_USAGE) 

187 if not ignore_usage and message_usage.upper() in (MessageOnlyPolicy.USE_TITLE, MessageOnlyPolicy.COMBINE_TITLE): 

188 title = None 

189 else: 

190 title = self.delivery.title if self.delivery.title is not None else self._title 

191 if ( 

192 self.delivery.option_bool(OPTION_SIMPLIFY_TEXT) is True 

193 or self.delivery.option_bool(OPTION_STRIP_URLS) is True 

194 ): 

195 title = self.delivery.transport.simplify(title, strip_urls=self.delivery.option_bool(OPTION_STRIP_URLS)) 

196 title = self._render_scenario_templates(title, "title_template", "notification_title") 

197 if title is None: 

198 return None 

199 return str(title) 

200 

201 def _compute_message(self) -> str | None: 

202 # message and title reverse the usual defaulting, delivery config overrides runtime call 

203 

204 msg: str | None = None 

205 if self.delivery is None: 

206 msg = self._message 

207 else: 

208 msg = self.delivery.message if self.delivery.message is not None else self._message 

209 message_usage: str = str(self.delivery.option_str(OPTION_MESSAGE_USAGE)) 

210 if message_usage.upper() == MessageOnlyPolicy.USE_TITLE: 

211 title = self._compute_title(ignore_usage=True) 

212 if title: 

213 msg = title 

214 elif message_usage.upper() == MessageOnlyPolicy.COMBINE_TITLE: 

215 title = self._compute_title(ignore_usage=True) 

216 if title: 

217 msg = f"{title} {msg}" 

218 if self.delivery.option_bool(OPTION_SIMPLIFY_TEXT) is True or self.delivery.option_bool(OPTION_STRIP_URLS) is True: 

219 msg = self.delivery.transport.simplify(msg, strip_urls=self.delivery.option_bool(OPTION_STRIP_URLS)) 

220 

221 msg = self._render_scenario_templates(msg, "message_template", "notification_message") 

222 if msg is None: # keep mypy happy 

223 return None 

224 return str(msg) 

225 

226 def _render_scenario_templates(self, original: str | None, template_field: str, matching_ctx: str) -> str | None: 

227 """Apply templating to a field, like message or title""" 

228 rendered = original if original is not None else "" 

229 delivery_configs: list[DeliveryCustomization] = list( 

230 filter(None, (scenario.delivery_config(self.delivery.name) for scenario in self._enabled_scenarios.values())) 

231 ) 

232 template_formats: list[str] = [ 

233 dc.data_value(template_field) 

234 for dc in delivery_configs 

235 if dc is not None and dc.data_value(template_field) is not None 

236 ] 

237 if template_formats and self.context: 

238 if self.condition_variables: 

239 context_vars: dict[str, Any] = cast("dict[str,Any]", self.condition_variables.as_dict()) 

240 else: 

241 context_vars = {} 

242 for template_format in template_formats: 

243 context_vars[matching_ctx] = rendered 

244 try: 

245 template = self.context.hass_api.template(template_format) 

246 rendered = template.async_render(variables=context_vars) 

247 except TemplateError as e: 

248 self.error_count += 1 

249 _LOGGER.warning( 

250 "SUPERNOTIFY Rendering template %s for %s failed: %s", template_field, self.delivery.name, e 

251 ) 

252 return rendered 

253 return original 

254 

255 # DupeCheckable implementation 

256 

257 def hash(self) -> int: 

258 """Alpha hash to reduce noise from messages with timestamps or incrementing counts""" 

259 

260 def alphaize(v: str | None) -> str | None: 

261 return v.translate(HASH_PREP_TRANSLATION_TABLE) if v else v 

262 

263 return hash((alphaize(self._message), alphaize(self.delivery.name), self.target.hash_resolved(), alphaize(self._title)))