Coverage for custom_components/supernotify/notification.py: 92%

452 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-21 23:31 +0000

1import datetime as dt 

2import logging 

3import string 

4import uuid 

5from traceback import format_exception 

6from typing import TYPE_CHECKING, Any 

7 

8import voluptuous as vol 

9from homeassistant.components.notify.const import ATTR_DATA 

10from homeassistant.const import CONF_ENABLED, CONF_TARGET, STATE_HOME, STATE_NOT_HOME 

11from jinja2 import TemplateError 

12from voluptuous import humanize 

13 

14from custom_components.supernotify import ( 

15 ACTION_DATA_SCHEMA, 

16 ATTR_ACTION_GROUPS, 

17 ATTR_ACTIONS, 

18 ATTR_DEBUG, 

19 ATTR_DELIVERY, 

20 ATTR_DELIVERY_SELECTION, 

21 ATTR_MEDIA, 

22 ATTR_MEDIA_CLIP_URL, 

23 ATTR_MEDIA_SNAPSHOT_URL, 

24 ATTR_MESSAGE_HTML, 

25 ATTR_PERSON_ID, 

26 ATTR_PRIORITY, 

27 ATTR_RECIPIENTS, 

28 ATTR_SCENARIOS_APPLY, 

29 ATTR_SCENARIOS_CONSTRAIN, 

30 ATTR_SCENARIOS_REQUIRE, 

31 CONF_DATA, 

32 CONF_DELIVERY, 

33 CONF_PERSON, 

34 DELIVERY_SELECTION_EXPLICIT, 

35 DELIVERY_SELECTION_FIXED, 

36 DELIVERY_SELECTION_IMPLICIT, 

37 OCCUPANCY_ALL, 

38 OCCUPANCY_ALL_IN, 

39 OCCUPANCY_ALL_OUT, 

40 OCCUPANCY_ANY_IN, 

41 OCCUPANCY_ANY_OUT, 

42 OCCUPANCY_NONE, 

43 OCCUPANCY_ONLY_IN, 

44 OCCUPANCY_ONLY_OUT, 

45 OPTION_MESSAGE_USAGE, 

46 OPTION_SIMPLIFY_TEXT, 

47 OPTION_STRIP_URLS, 

48 OPTION_UNIQUE_TARGETS, 

49 PRIORITY_MEDIUM, 

50 PRIORITY_VALUES, 

51 SCENARIO_NULL, 

52 SELECTION_BY_SCENARIO, 

53 STRICT_ACTION_DATA_SCHEMA, 

54 TARGET_USE_FIXED, 

55 TARGET_USE_MERGE_ALWAYS, 

56 TARGET_USE_MERGE_ON_DELIVERY_TARGETS, 

57 TARGET_USE_ON_NO_ACTION_TARGETS, 

58 TARGET_USE_ON_NO_DELIVERY_TARGETS, 

59 SelectionRank, 

60) 

61from custom_components.supernotify.archive import ArchivableObject 

62from custom_components.supernotify.delivery import Delivery, DeliveryRegistry 

63from custom_components.supernotify.envelope import Envelope 

64from custom_components.supernotify.model import ConditionVariables, MessageOnlyPolicy, SuppressionReason, Target, TargetRequired 

65from custom_components.supernotify.scenario import Scenario 

66 

67from .common import ensure_dict, ensure_list 

68from .context import Context 

69 

70if TYPE_CHECKING: 

71 from pathlib import Path 

72 

73 from custom_components.supernotify.people import PeopleRegistry 

74 from custom_components.supernotify.transport import ( 

75 Transport, 

76 ) 

77 

78_LOGGER = logging.getLogger(__name__) 

79 

80 

81HASH_PREP_TRANSLATION_TABLE = table = str.maketrans("", "", string.punctuation + string.digits) 

82 

83 

84class Notification(ArchivableObject): 

85 def __init__( 

86 self, 

87 context: Context, 

88 message: str | None = None, 

89 title: str | None = None, 

90 target: list[str] | str | None = None, 

91 action_data: dict[str, Any] | None = None, 

92 ) -> None: 

93 self.created: dt.datetime = dt.datetime.now(tz=dt.UTC) 

94 self.debug_trace: DebugTrace = DebugTrace(message=message, title=title, data=action_data, target=target) 

95 self._message: str | None = message 

96 self.context: Context = context 

97 self.people_registry: PeopleRegistry = context.people_registry 

98 self.delivery_registry: DeliveryRegistry = context.delivery_registry 

99 action_data = action_data or {} 

100 self.target: Target | None = Target(target) if target else None 

101 self.selected: Target = Target() 

102 self._title: str | None = title 

103 self.id = str(uuid.uuid1()) 

104 self.snapshot_image_path: Path | None = None 

105 self.delivered: int = 0 

106 self.errored: int = 0 

107 self.skipped: int = 0 

108 self.missed: int = 0 

109 self.delivered_envelopes: list[Envelope] = [] 

110 self.undelivered_envelopes: list[Envelope] = [] 

111 self.delivery_error: list[str] | None = None 

112 

113 self.validate_action_data(action_data) 

114 # for compatibility with other notify calls, pass thru surplus data to underlying delivery transports 

115 self.data: dict[str, Any] = {k: v for k, v in action_data.items() if k not in STRICT_ACTION_DATA_SCHEMA(action_data)} 

116 action_data = {k: v for k, v in action_data.items() if k not in self.data} 

117 

118 self.priority: str = action_data.get(ATTR_PRIORITY, PRIORITY_MEDIUM) 

119 self.message_html: str | None = action_data.get(ATTR_MESSAGE_HTML) 

120 self.required_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_REQUIRE)) 

121 self.applied_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_APPLY)) 

122 self.constrain_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_CONSTRAIN)) 

123 self.delivery_selection: str | None = action_data.get(ATTR_DELIVERY_SELECTION) 

124 self.delivery_overrides_type: str = action_data.get(ATTR_DELIVERY).__class__.__name__ 

125 self.delivery_overrides: dict[str, Any] = ensure_dict(action_data.get(ATTR_DELIVERY)) 

126 self.action_groups: list[str] | None = action_data.get(ATTR_ACTION_GROUPS) 

127 self.recipients_override: list[str] | None = action_data.get(ATTR_RECIPIENTS) 

128 self.data.update(action_data.get(ATTR_DATA, {})) 

129 self.media: dict[str, Any] = action_data.get(ATTR_MEDIA) or {} 

130 self.debug: bool = action_data.get(ATTR_DEBUG, False) 

131 self.actions: list[dict[str, Any]] = action_data.get(ATTR_ACTIONS) or [] 

132 self.delivery_results: dict[str, Any] = {} 

133 self.delivery_errors: dict[str, Any] = {} 

134 

135 self.selected_delivery_names: list[str] = [] 

136 self.enabled_scenarios: dict[str, Scenario] = {} 

137 self.selected_scenario_names: list[str] = [] 

138 self.people_by_occupancy: list[dict[str, Any]] = [] 

139 self.suppressed: SuppressionReason | None = None 

140 self.occupancy: dict[str, list[dict[str, Any]]] = {} 

141 self.condition_variables: ConditionVariables | None = None 

142 

143 async def initialize(self) -> None: 

144 """Async post-construction initialization""" 

145 if self.delivery_selection is None: 

146 if self.delivery_overrides_type in ("list", "str"): 

147 # a bare list of deliveries implies intent to restrict 

148 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for type %s", self.delivery_overrides_type) 

149 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT 

150 else: 

151 # whereas a dict may be used to tune or restrict 

152 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT 

153 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as implicit for type %s", self.delivery_overrides_type) 

154 

155 self.occupancy = self.people_registry.determine_occupancy() 

156 self.condition_variables = ConditionVariables( 

157 self.applied_scenario_names, 

158 self.required_scenario_names, 

159 self.constrain_scenario_names, 

160 self.priority, 

161 self.occupancy, 

162 self._message, 

163 self._title, 

164 ) # requires occupancy first 

165 

166 enabled_scenario_names: list[str] = list(self.applied_scenario_names) or [] 

167 self.selected_scenario_names = await self.select_scenarios() 

168 enabled_scenario_names.extend(self.selected_scenario_names) 

169 if self.constrain_scenario_names: 

170 enabled_scenario_names = [ 

171 s 

172 for s in enabled_scenario_names 

173 if (s in self.constrain_scenario_names or s in self.applied_scenario_names) and s != SCENARIO_NULL 

174 ] 

175 if self.required_scenario_names and not any(s in enabled_scenario_names for s in self.required_scenario_names): 

176 _LOGGER.info("SUPERNOTIFY suppressing notification, no required scenarios enabled") 

177 self.selected_delivery_names = [] 

178 self.suppress(SuppressionReason.NO_SCENARIO) 

179 else: 

180 for s in enabled_scenario_names: 

181 scenario_obj = self.context.scenario_registry.scenarios.get(s) 

182 if scenario_obj is not None: 

183 self.enabled_scenarios[s] = scenario_obj 

184 

185 self.selected_delivery_names = self.select_deliveries() 

186 if self.context.snoozer.is_global_snooze(self.priority): 

187 self.suppress(SuppressionReason.SNOOZED) 

188 self.default_media_from_actions() 

189 self.apply_enabled_scenarios() 

190 

191 def validate_action_data(self, action_data: dict[str, Any]) -> None: 

192 if action_data.get(ATTR_PRIORITY) and action_data.get(ATTR_PRIORITY) not in PRIORITY_VALUES: 

193 _LOGGER.warning("SUPERNOTIFY invalid priority %s - overriding to medium", action_data.get(ATTR_PRIORITY)) 

194 action_data[ATTR_PRIORITY] = PRIORITY_MEDIUM 

195 try: 

196 humanize.validate_with_humanized_errors(action_data, ACTION_DATA_SCHEMA) 

197 except vol.Invalid as e: 

198 _LOGGER.warning("SUPERNOTIFY invalid service data %s: %s", action_data, e) 

199 raise 

200 

201 def apply_enabled_scenarios(self) -> None: 

202 """Set media and action_groups from scenario if defined, first come first applied""" 

203 action_groups: list[str] = [] 

204 for scen_obj in self.enabled_scenarios.values(): 

205 if scen_obj.media and not self.media: 

206 self.media.update(scen_obj.media) 

207 if scen_obj.action_groups: 

208 action_groups.extend(ag for ag in scen_obj.action_groups if ag not in action_groups) 

209 if action_groups: 

210 self.action_groups = action_groups 

211 

212 def select_deliveries(self) -> list[str]: 

213 scenario_enable_deliveries: list[str] = [] 

214 default_enable_deliveries: list[str] = [] 

215 scenario_disable_deliveries: list[str] = [] 

216 

217 if self.delivery_selection != DELIVERY_SELECTION_FIXED: 

218 for scenario_name in self.enabled_scenarios: 

219 scenario_enable_deliveries.extend(self.context.scenario_registry.delivery_by_scenario.get(scenario_name, ())) 

220 if self.delivery_selection == DELIVERY_SELECTION_IMPLICIT: 

221 default_enable_deliveries = [d.name for d in self.context.delivery_registry.implicit_deliveries] 

222 

223 self.debug_trace.record_delivery_selection("scenario_enable_deliveries", scenario_enable_deliveries) 

224 self.debug_trace.record_delivery_selection("default_enable_deliveries", default_enable_deliveries) 

225 self.debug_trace.record_delivery_selection("scenario_disable_deliveries", scenario_disable_deliveries) 

226 

227 override_enable_deliveries = [] 

228 override_disable_deliveries = [] 

229 

230 for delivery, delivery_override in self.delivery_overrides.items(): 

231 if ( 

232 delivery_override is None or delivery_override.get(CONF_ENABLED, True) 

233 ) and delivery in self.context.delivery_registry.deliveries: 

234 override_enable_deliveries.append(delivery) 

235 elif delivery_override is not None and not delivery_override.get(CONF_ENABLED, True): 

236 override_disable_deliveries.append(delivery) 

237 

238 if self.delivery_selection != DELIVERY_SELECTION_FIXED: 

239 scenario_disable_deliveries = [ 

240 d.name 

241 for d in self.context.delivery_registry.deliveries.values() 

242 if d.selection == [SELECTION_BY_SCENARIO] 

243 and d.name not in scenario_enable_deliveries 

244 and (d.name not in override_enable_deliveries or self.delivery_selection != DELIVERY_SELECTION_EXPLICIT) 

245 ] 

246 all_enabled = list(set(scenario_enable_deliveries + default_enable_deliveries + override_enable_deliveries)) 

247 all_disabled = scenario_disable_deliveries + override_disable_deliveries 

248 self.debug_trace.record_delivery_selection("override_disable_deliveries", override_disable_deliveries) 

249 self.debug_trace.record_delivery_selection("override_enable_deliveries", override_enable_deliveries) 

250 

251 unsorted_objs: list[Delivery] = [self.delivery_registry.deliveries[d] for d in all_enabled if d not in all_disabled] 

252 first: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.FIRST] 

253 anywhere: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.ANY] 

254 last: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.LAST] 

255 selected = first + anywhere + last 

256 self.debug_trace.record_delivery_selection("ranked", selected) 

257 return selected 

258 

259 def default_media_from_actions(self) -> None: 

260 """If no media defined, look for iOS / Android actions that have media defined""" 

261 if self.media: 

262 return 

263 if self.data.get("image"): 

264 self.media[ATTR_MEDIA_SNAPSHOT_URL] = self.data.get("image") 

265 if self.data.get("video"): 

266 self.media[ATTR_MEDIA_CLIP_URL] = self.data.get("video") 

267 if self.data.get("attachment", {}).get("url"): 

268 url = self.data["attachment"]["url"] 

269 if url and url.endswith(".mp4") and not self.media.get(ATTR_MEDIA_CLIP_URL): 

270 self.media[ATTR_MEDIA_CLIP_URL] = url 

271 elif ( 

272 url 

273 and (url.endswith(".jpg") or url.endswith(".jpeg") or url.endswith(".png")) 

274 and not self.media.get(ATTR_MEDIA_SNAPSHOT_URL) 

275 ): 

276 self.media[ATTR_MEDIA_SNAPSHOT_URL] = url 

277 

278 def _render_scenario_templates( 

279 self, original: str | None, template_field: str, matching_ctx: str, delivery_name: str 

280 ) -> str | None: 

281 template_scenario_names = self.context.scenario_registry.content_scenario_templates.get(template_field, {}).get( 

282 delivery_name, [] 

283 ) 

284 if not template_scenario_names: 

285 return original 

286 context_vars = self.condition_variables.as_dict() if self.condition_variables else {} 

287 rendered = original if original is not None else "" 

288 for scen_obj in [obj for name, obj in self.enabled_scenarios.items() if name in template_scenario_names]: 

289 context_vars[matching_ctx] = rendered 

290 try: 

291 template_format = scen_obj.delivery.get(delivery_name, {}).get(CONF_DATA, {}).get(template_field) 

292 if template_format is not None: 

293 template = self.context.hass_api.template(template_format) 

294 rendered = template.async_render(variables=context_vars) 

295 except TemplateError as e: 

296 _LOGGER.warning("SUPERNOTIFY Rendering template %s for %s failed: %s", template_field, delivery_name, e) 

297 return rendered 

298 

299 def message(self, delivery_name: str) -> str | None: 

300 # message and title reverse the usual defaulting, delivery config overrides runtime call 

301 delivery_config: Delivery | None = self.context.delivery_registry.deliveries.get(delivery_name) 

302 msg: str | None = None 

303 if delivery_config is None: 

304 msg = self._message 

305 else: 

306 msg = delivery_config.message if delivery_config.message is not None else self._message 

307 message_usage: str = str(delivery_config.option_str(OPTION_MESSAGE_USAGE)) 

308 if message_usage.upper() == MessageOnlyPolicy.USE_TITLE: 

309 title = self.title(delivery_name, ignore_usage=True) 

310 if title: 

311 msg = title 

312 elif message_usage.upper() == MessageOnlyPolicy.COMBINE_TITLE: 

313 title = self.title(delivery_name, ignore_usage=True) 

314 if title: 

315 msg = f"{title} {msg}" 

316 if ( 

317 delivery_config.option_bool(OPTION_SIMPLIFY_TEXT) is True 

318 or delivery_config.option_bool(OPTION_STRIP_URLS) is True 

319 ): 

320 msg = delivery_config.transport.simplify(msg, strip_urls=delivery_config.option_bool(OPTION_STRIP_URLS)) 

321 

322 msg = self._render_scenario_templates(msg, "message_template", "notification_message", delivery_name) 

323 if msg is None: # keep mypy happy 

324 return None 

325 return str(msg) 

326 

327 def title(self, delivery_name: str, ignore_usage: bool = False) -> str | None: 

328 # message and title reverse the usual defaulting, delivery config overrides runtime call 

329 delivery_config: Delivery | None = self.context.delivery_registry.deliveries.get(delivery_name) 

330 title: str | None = None 

331 if delivery_config is None: 

332 title = self._title 

333 else: 

334 message_usage = delivery_config.option_str(OPTION_MESSAGE_USAGE) 

335 if not ignore_usage and message_usage.upper() in (MessageOnlyPolicy.USE_TITLE, MessageOnlyPolicy.COMBINE_TITLE): 

336 title = None 

337 else: 

338 title = delivery_config.title if delivery_config.title is not None else self._title 

339 if ( 

340 delivery_config.option_bool(OPTION_SIMPLIFY_TEXT) is True 

341 or delivery_config.option_bool(OPTION_STRIP_URLS) is True 

342 ): 

343 title = delivery_config.transport.simplify(title, strip_urls=delivery_config.option_bool(OPTION_STRIP_URLS)) 

344 title = self._render_scenario_templates(title, "title_template", "notification_title", delivery_name) 

345 if title is None: 

346 return None 

347 return str(title) 

348 

349 def suppress(self, reason: SuppressionReason) -> None: 

350 self.suppressed = reason 

351 _LOGGER.info(f"SUPERNOTIFY Suppressing notification, reason:{reason}, id:{self.id}") 

352 

353 async def deliver(self) -> bool: 

354 if self.suppressed is not None: 

355 _LOGGER.info("SUPERNOTIFY Suppressing globally silenced/snoozed notification (%s)", self.id) 

356 self.skipped += 1 

357 return False 

358 

359 _LOGGER.debug( 

360 "Message: %s, notification: %s, deliveries: %s", 

361 self._message, 

362 self.id, 

363 self.selected_delivery_names, 

364 ) 

365 

366 for delivery_name in self.selected_delivery_names: 

367 delivery = self.context.delivery_registry.deliveries.get(delivery_name) 

368 if delivery: 

369 await self.call_transport(delivery) 

370 else: 

371 _LOGGER.error(f"SUPERNOTIFY Unexpected missing delivery {delivery_name}") 

372 

373 if self.delivered == 0 and self.errored == 0: 

374 for delivery in self.context.delivery_registry.fallback_by_default_deliveries: 

375 if delivery.name not in self.selected_delivery_names: 

376 await self.call_transport(delivery) 

377 

378 if self.delivered == 0 and self.errored > 0: 

379 for delivery in self.context.delivery_registry.fallback_on_error_deliveries: 

380 if delivery.name not in self.selected_delivery_names: 

381 await self.call_transport(delivery) 

382 

383 return self.delivered > 0 

384 

385 async def call_transport(self, delivery: Delivery) -> None: 

386 try: 

387 transport: Transport = delivery.transport 

388 if not transport.override_enabled: 

389 self.skipped += 1 

390 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on transport disabled", delivery) 

391 return 

392 

393 delivery_priorities = delivery.priority 

394 if self.priority and delivery_priorities and self.priority not in delivery_priorities: 

395 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on priority (%s)", delivery, self.priority) 

396 self.skipped += 1 

397 return 

398 if not await delivery.evaluate_conditions(self.condition_variables): 

399 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on conditions", delivery) 

400 self.skipped += 1 

401 return 

402 

403 recipients: list[Target] = self.generate_recipients(delivery) 

404 envelopes = self.generate_envelopes(delivery, recipients) 

405 for envelope in envelopes: 

406 try: 

407 if not await transport.deliver(envelope): 

408 self.missed += 1 

409 self.delivered += envelope.delivered 

410 self.errored += envelope.errored 

411 if envelope.delivered: 

412 self.delivered_envelopes.append(envelope) 

413 else: 

414 self.undelivered_envelopes.append(envelope) 

415 except Exception as e2: 

416 _LOGGER.exception("SUPERNOTIFY Failed to deliver %s: %s", envelope.delivery_name, e2) 

417 self.errored += 1 

418 transport.record_error(str(e2), method="deliver") 

419 envelope.delivery_error = format_exception(e2) 

420 self.undelivered_envelopes.append(envelope) 

421 

422 except Exception as e: 

423 _LOGGER.exception("SUPERNOTIFY Failed to notify using %s", delivery.name) 

424 _LOGGER.debug("SUPERNOTIFY %s delivery failure", delivery, exc_info=True) 

425 self.delivery_errors[delivery.name] = format_exception(e) 

426 

427 def hash(self) -> int: 

428 """Alpha hash to reduce noise from messages with timestamps or incrementing counts""" 

429 

430 def alphaize(v: str | None) -> str | None: 

431 return v.translate(HASH_PREP_TRANSLATION_TABLE) if v else v 

432 

433 return hash((alphaize(self._message), alphaize(self._title))) 

434 

435 def contents(self, minimal: bool = False) -> dict[str, Any]: 

436 """ArchiveableObject implementation""" 

437 object_refs = ("context", "people_registry", "delivery_registry") 

438 sanitized = {k: v for k, v in self.__dict__.items() if k not in object_refs and not k.startswith("_")} 

439 sanitized["delivered_envelopes"] = [e.contents(minimal=minimal) for e in self.delivered_envelopes] 

440 sanitized["undelivered_envelopes"] = [e.contents(minimal=minimal) for e in self.undelivered_envelopes] 

441 sanitized["enabled_scenarios"] = {k: v.contents(minimal=minimal) for k, v in self.enabled_scenarios.items()} 

442 if sanitized["target"]: 

443 sanitized["target"] = sanitized["target"].as_dict() 

444 if sanitized["selected"]: 

445 sanitized["selected"] = sanitized["selected"].as_dict() 

446 

447 for state, person_objs in sanitized["occupancy"].items(): 

448 sanitized["occupancy"][state] = [ 

449 {"person": person_obj["person"], "state": person_obj.get("state"), "user_id": person_obj.get("user_id")} 

450 for person_obj in person_objs 

451 ] 

452 

453 if self.debug_trace: 

454 sanitized["debug_trace"] = self.debug_trace.contents() 

455 else: 

456 del sanitized["debug_trace"] 

457 return sanitized 

458 

459 def base_filename(self) -> str: 

460 """ArchiveableObject implementation""" 

461 return f"{self.created.isoformat()[:16]}_{self.id}" 

462 

463 def delivery_data(self, delivery_name: str) -> dict[str, Any]: 

464 delivery_override = self.delivery_overrides.get(delivery_name) 

465 return delivery_override.get(CONF_DATA) if delivery_override else {} 

466 

467 def delivery_scenarios(self, delivery_name: str) -> dict[str, Scenario]: 

468 return { 

469 s: obj 

470 for s, obj in self.enabled_scenarios.items() 

471 if delivery_name in self.context.scenario_registry.delivery_by_scenario.get(s, []) 

472 } 

473 

474 async def select_scenarios(self) -> list[str]: 

475 return [s.name for s in self.context.scenario_registry.scenarios.values() if await s.evaluate(self.condition_variables)] 

476 

477 def merge(self, attribute: str, delivery_name: str) -> dict[str, Any]: 

478 delivery: dict[str, Any] = self.delivery_overrides.get(delivery_name, {}) 

479 base: dict[str, Any] = delivery.get(attribute, {}) 

480 for scenario in self.enabled_scenarios.values(): 

481 if scenario and hasattr(scenario, attribute): 

482 base.update(getattr(scenario, attribute)) 

483 if hasattr(self, attribute): 

484 base.update(getattr(self, attribute)) 

485 return base 

486 

487 def filter_people_by_occupancy(self, occupancy: str) -> list[dict[str, Any]]: 

488 people = list(self.people_registry.people.values()) 

489 if occupancy == OCCUPANCY_ALL: 

490 return people 

491 if occupancy == OCCUPANCY_NONE: 

492 return [] 

493 

494 away = self.occupancy[STATE_NOT_HOME] 

495 at_home = self.occupancy[STATE_HOME] 

496 if occupancy == OCCUPANCY_ALL_IN: 

497 return people if len(away) == 0 else [] 

498 if occupancy == OCCUPANCY_ALL_OUT: 

499 return people if len(at_home) == 0 else [] 

500 if occupancy == OCCUPANCY_ANY_IN: 

501 return people if len(at_home) > 0 else [] 

502 if occupancy == OCCUPANCY_ANY_OUT: 

503 return people if len(away) > 0 else [] 

504 if occupancy == OCCUPANCY_ONLY_IN: 

505 return at_home 

506 if occupancy == OCCUPANCY_ONLY_OUT: 

507 return away 

508 

509 _LOGGER.warning("SUPERNOTIFY Unknown occupancy tested: %s", occupancy) 

510 return [] 

511 

512 def generate_recipients(self, delivery: Delivery) -> list[Target]: 

513 

514 if delivery.target_required == TargetRequired.NEVER: 

515 # don't waste time computing targets for deliveries that don't need them 

516 return [Target(None, target_data=delivery.data)] 

517 

518 computed_target: Target 

519 

520 if delivery.target_usage == TARGET_USE_FIXED: 

521 if delivery.target: 

522 computed_target = delivery.target.safe_copy() 

523 self.debug_trace.record_target(delivery.name, "1a_delivery_default_fixed", computed_target) 

524 else: 

525 computed_target = Target(None, target_data=delivery.data) 

526 self.debug_trace.record_target(delivery.name, "1b_delivery_default_fixed_empty", computed_target) 

527 

528 elif not self.target: 

529 # Unless there are explicit targets, include everyone on the people registry 

530 computed_target = self.default_person_ids(delivery) 

531 self.debug_trace.record_target(delivery.name, "1c_no_action_target", computed_target) 

532 else: 

533 computed_target = self.target.safe_copy() 

534 self.debug_trace.record_target(delivery.name, "1d_action_target", computed_target) 

535 

536 # 1st round of filtering for snooze and resolving people->direct targets 

537 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery) 

538 self.debug_trace.record_target(delivery.name, "2a_post_snooze", computed_target) 

539 # turn person_ids into emails and phone numbers 

540 computed_target += self.resolve_indirect_targets(computed_target, delivery) 

541 self.debug_trace.record_target(delivery.name, "2b_resolve_indirect", computed_target) 

542 # filter out target not required for this delivery 

543 computed_target = delivery.select_targets(computed_target) 

544 self.debug_trace.record_target(delivery.name, "2c_delivery_selection", computed_target) 

545 primary_count = len(computed_target) 

546 

547 if delivery.target_usage == TARGET_USE_ON_NO_DELIVERY_TARGETS: 

548 if not computed_target.has_targets() and delivery.target: 

549 computed_target += delivery.target 

550 self.debug_trace.record_target(delivery.name, "3a_delivery_default_no_delivery_targets", computed_target) 

551 elif delivery.target_usage == TARGET_USE_ON_NO_ACTION_TARGETS: 

552 if not self.target and delivery.target: 

553 computed_target += delivery.target 

554 self.debug_trace.record_target(delivery.name, "3b_delivery_default_no_action_targets", computed_target) 

555 elif delivery.target_usage == TARGET_USE_MERGE_ON_DELIVERY_TARGETS: 

556 # merge in the delivery defaults if there's a target defined in action call 

557 if computed_target.has_targets() and delivery.target: 

558 computed_target += delivery.target 

559 self.debug_trace.record_target(delivery.name, "3c_delivery_merge_on_delivery_targets", computed_target) 

560 elif delivery.target_usage == TARGET_USE_MERGE_ALWAYS: 

561 # merge in the delivery defaults even if there's not a target defined in action call 

562 if delivery.target: 

563 computed_target += delivery.target 

564 self.debug_trace.record_target(delivery.name, "3d_delivery_merge_always_targets", computed_target) 

565 elif delivery.target_usage == TARGET_USE_FIXED: 

566 _LOGGER.debug("SUPERNOTIFY Fixed target on delivery %s", delivery.name) 

567 else: 

568 self.debug_trace.record_target(delivery.name, "3f_no_target_usage_match", computed_target) 

569 _LOGGER.debug("SUPERNOTIFY No useful target definition for delivery %s", delivery.name) 

570 

571 if len(computed_target) > primary_count: 

572 _LOGGER.debug( 

573 "SUPERNOTIFY Delivery config added %s targets for %s", len(computed_target) - primary_count, delivery.name 

574 ) 

575 

576 # 2nd round of filtering for snooze and resolving people->direct targets after delivery target applied 

577 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery) 

578 self.debug_trace.record_target(delivery.name, "4a_post_snooze", computed_target) 

579 computed_target += self.resolve_indirect_targets(computed_target, delivery) 

580 self.debug_trace.record_target(delivery.name, "4b_resolved_indirect_targets", computed_target) 

581 computed_target = delivery.select_targets(computed_target) 

582 self.debug_trace.record_target(delivery.name, "4c_delivery_selection", computed_target) 

583 

584 split_targets: list[Target] = computed_target.split_by_target_data() 

585 self.debug_trace.record_target(delivery.name, "5a_delivery_split_targets", split_targets) 

586 direct_targets: list[Target] = [t.direct() for t in split_targets] 

587 self.debug_trace.record_target(delivery.name, "5b_narrow_to_direct", direct_targets) 

588 if delivery.options.get(OPTION_UNIQUE_TARGETS, False): 

589 direct_targets = [t - self.selected for t in direct_targets] 

590 self.debug_trace.record_target(delivery.name, "5c_make_unique_across_deliveries", direct_targets) 

591 for direct_target in direct_targets: 

592 self.selected += direct_target 

593 self.debug_trace.record_target(delivery.name, "6_final_cut", direct_targets) 

594 return direct_targets 

595 

596 def default_person_ids(self, delivery: Delivery) -> Target: 

597 # If target not specified on service call or delivery, then default to std list of recipients 

598 people: list[dict[str, Any]] = self.filter_people_by_occupancy(delivery.occupancy) 

599 people = [p for p in people if self.recipients_override is None or p.get(CONF_PERSON) in self.recipients_override] 

600 return Target({ATTR_PERSON_ID: [p[CONF_PERSON] for p in people if CONF_PERSON in p]}) 

601 

602 def resolve_indirect_targets(self, target: Target, delivery: Delivery) -> Target: 

603 # enrich data selected in configuration for this delivery, from direct target definition or attrs like email or phone 

604 resolved: Target = Target() 

605 

606 for person_id in target.person_ids: 

607 person = self.people_registry.people.get(person_id) 

608 if person and person.get(CONF_ENABLED, True): 

609 recipient_target = Target({ATTR_PERSON_ID: [person_id]}) 

610 personal_target: Target | None = person.get(CONF_TARGET) 

611 if personal_target is not None and personal_target.has_resolved_target(): 

612 recipient_target += personal_target 

613 personal_delivery: dict[str, Any] | None = person.get(CONF_DELIVERY, {}).get(delivery.name) 

614 if personal_delivery: 

615 # TODO: replace all this hackery with people/people_registry improvements 

616 if personal_delivery.get(CONF_ENABLED, True) and personal_delivery.get(CONF_TARGET): 

617 personal_delivery_target: Target = Target( 

618 personal_delivery.get(CONF_TARGET), 

619 target_data=personal_delivery.get(CONF_DATA), 

620 target_specific_data=True, 

621 ) 

622 personal_delivery_target.extend(ATTR_PERSON_ID, [person_id]) 

623 if personal_delivery_target is not None and personal_delivery_target.has_resolved_target(): 

624 recipient_target += personal_delivery_target 

625 

626 resolved += recipient_target 

627 return resolved 

628 

629 def generate_envelopes(self, delivery: Delivery, targets: list[Target]) -> list[Envelope]: 

630 # now the list of recipients determined, resolve this to target addresses or entities 

631 

632 default_data: dict[str, Any] = delivery.data 

633 

634 envelopes = [] 

635 for target in targets: 

636 if target.has_resolved_target() or delivery.target_required != TargetRequired.ALWAYS: 

637 envelope_data = {} 

638 envelope_data.update(default_data) 

639 envelope_data.update(self.data) 

640 if target.target_data: 

641 envelope_data.update(target.target_data) 

642 envelopes.append(Envelope(delivery, self, target, envelope_data)) 

643 

644 return envelopes 

645 

646 

647class DebugTrace: 

648 def __init__( 

649 self, 

650 message: str | None, 

651 title: str | None, 

652 data: dict[str, Any] | None, 

653 target: dict[str, list[str]] | list[str] | str | None, 

654 ) -> None: 

655 self.message: str | None = message 

656 self.title: str | None = title 

657 self.data: dict[str, Any] | None = data 

658 self.target: dict[str, list[str]] | list[str] | str | None = target 

659 self.resolved: dict[str, dict[str, Any]] = {} 

660 self.delivery_selection: dict[str, list[str]] = {} 

661 self._last_stage: dict[str, str] = {} 

662 

663 def contents( 

664 self, 

665 ) -> dict[str, Any]: 

666 return { 

667 "message": self.message, 

668 "title": self.title, 

669 "data": self.data, 

670 "target": self.target, 

671 "resolved": self.resolved, 

672 "delivery_selection": self.delivery_selection, 

673 } 

674 

675 def record_target(self, delivery_name: str, stage: str, computed: Target | list[Target]) -> None: 

676 """Debug support for recording detailed target resolution in archived notification""" 

677 self.resolved.setdefault(delivery_name, {}) 

678 self.resolved[delivery_name].setdefault(stage, {}) 

679 if isinstance(computed, Target): 

680 combined = computed 

681 else: 

682 combined = Target() 

683 for target in ensure_list(computed): 

684 combined += target 

685 result: str | dict[str, Any] = combined.as_dict() 

686 if self._last_stage.get(delivery_name): 

687 last_target = self.resolved[delivery_name][self._last_stage[delivery_name]] 

688 if last_target is not None and last_target == result: 

689 result = "NO_CHANGE" 

690 

691 self.resolved[delivery_name][stage] = result 

692 self._last_stage[delivery_name] = stage 

693 

694 def record_delivery_selection(self, stage: str, delivery_selection: list[str]) -> None: 

695 """Debug support for recording detailed target resolution in archived notification""" 

696 self.delivery_selection[stage] = delivery_selection