Coverage for custom_components/supernotify/notification.py: 90%

499 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-03-28 19:39 +0000

1import datetime as dt 

2import logging 

3import uuid 

4from traceback import format_exception 

5from typing import TYPE_CHECKING, Any, cast 

6 

7import voluptuous as vol 

8from homeassistant.components.notify.const import ATTR_DATA 

9from voluptuous import humanize 

10 

11from custom_components.supernotify.schema import SelectionRank 

12 

13from .archive import ArchivableObject 

14from .common import ensure_list, nullable_ensure_list, sanitize 

15from .const import ( 

16 ATTR_ACTION_GROUPS, 

17 ATTR_ACTIONS, 

18 ATTR_DEBUG, 

19 ATTR_DELIVERY, 

20 ATTR_DELIVERY_SELECTION, 

21 ATTR_FORCE_RESEND, 

22 ATTR_MEDIA, 

23 ATTR_MEDIA_CLIP_URL, 

24 ATTR_MEDIA_SNAPSHOT_URL, 

25 ATTR_MESSAGE_HTML, 

26 ATTR_PERSON_ID, 

27 ATTR_PRIORITY, 

28 ATTR_RECIPIENTS, 

29 ATTR_SCENARIOS_APPLY, 

30 ATTR_SCENARIOS_CONSTRAIN, 

31 ATTR_SCENARIOS_REQUIRE, 

32 ATTR_SPOKEN_MESSAGE, 

33 DELIVERY_SELECTION_EXPLICIT, 

34 DELIVERY_SELECTION_FIXED, 

35 DELIVERY_SELECTION_IMPLICIT, 

36 OPTION_UNIQUE_TARGETS, 

37 PRIORITY_MEDIUM, 

38 PRIORITY_VALUES, 

39 TARGET_USE_FIXED, 

40 TARGET_USE_MERGE_ALWAYS, 

41 TARGET_USE_MERGE_ON_DELIVERY_TARGETS, 

42 TARGET_USE_ON_NO_ACTION_TARGETS, 

43 TARGET_USE_ON_NO_DELIVERY_TARGETS, 

44) 

45from .envelope import Envelope 

46from .model import ( 

47 ConditionVariables, 

48 DebugTrace, 

49 DeliveryCustomization, 

50 SuppressionReason, 

51 Target, 

52 TargetRequired, 

53) 

54from .schema import ACTION_DATA_SCHEMA, STRICT_ACTION_DATA_SCHEMA, Outcome 

55 

56if TYPE_CHECKING: 

57 from .context import Context 

58 from .delivery import Delivery, DeliveryRegistry 

59 from .people import PeopleRegistry, Recipient 

60 from .scenario import Scenario 

61 from .transport import ( 

62 Transport, 

63 ) 

64 

65_LOGGER = logging.getLogger(__name__) 

66 

67# Deliveries mapping keys for debug / archive 

68KEY_DELIVERED = "delivered" 

69KEY_SUPPRESSED = "suppressed" 

70KEY_FAILED = "failed" 

71KEY_SKIPPED = "skipped" 

72 

73# supernotify specific data items not to be passed to transports in data 

74INTERNAL_DATA_KEYS = (ATTR_FORCE_RESEND, ATTR_SPOKEN_MESSAGE) 

75 

76type t_delivery_name = str 

77type t_outcome = str 

78 

79 

80class Notification(ArchivableObject): 

81 def __init__( 

82 self, 

83 context: Context, 

84 message: str | None = None, 

85 title: str | None = None, 

86 target: list[str] | str | None = None, 

87 action_data: dict[str, Any] | None = None, 

88 ) -> None: 

89 self.created: dt.datetime = dt.datetime.now(tz=dt.UTC) 

90 self.debug_trace: DebugTrace = DebugTrace(message=message, title=title, data=action_data, target=target) 

91 self.message: str | None = message 

92 self.context: Context = context 

93 self.people_registry: PeopleRegistry = context.people_registry 

94 self.delivery_registry: DeliveryRegistry = context.delivery_registry 

95 action_data = action_data or {} 

96 self._target: Target | None = Target(target) if target else None 

97 self._already_selected: Target = Target() 

98 self._title: str | None = title 

99 self.id = str(uuid.uuid1()) 

100 self.delivered: int = 0 

101 self.error_count: int = 0 

102 self.skipped: int = 0 

103 self.failed: int = 0 

104 self.suppressed: int = 0 

105 self.fallback: int = 0 

106 self.dupe: bool = False 

107 self.deliveries: dict[t_delivery_name, dict[t_outcome, list[str] | list[Envelope] | dict[str, Any]]] = {} 

108 self._skip_reasons: list[SuppressionReason] = [] 

109 

110 self.validate_action_data(action_data) 

111 # for compatibility with other notify calls, pass thru surplus data to underlying delivery transports 

112 self.extra_data: dict[str, Any] = { 

113 k: v for k, v in action_data.items() if k not in STRICT_ACTION_DATA_SCHEMA(action_data) 

114 } 

115 

116 action_data = {k: v for k, v in action_data.items() if k not in self.extra_data} 

117 self.extra_data.update(action_data.get(ATTR_DATA, {})) # nested `data` could be supernotify or target service 

118 

119 self.priority: str = action_data.get(ATTR_PRIORITY, PRIORITY_MEDIUM) 

120 self.message_html: str | None = action_data.get(ATTR_MESSAGE_HTML) 

121 self.force_resend: bool = action_data.get(ATTR_FORCE_RESEND, False) 

122 self.required_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_REQUIRE)) 

123 self.applied_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_APPLY)) 

124 self.constrain_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_CONSTRAIN)) 

125 self.delivery_selection: str | None = action_data.get(ATTR_DELIVERY_SELECTION) 

126 self.delivery_overrides: dict[str, DeliveryCustomization] = {} 

127 

128 delivery_data = action_data.get(ATTR_DELIVERY) 

129 if isinstance(delivery_data, list): 

130 # a bare list of deliveries implies intent to restrict 

131 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for list %s", delivery_data) 

132 if self.delivery_selection is None: 

133 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT 

134 self.delivery_overrides = {k: DeliveryCustomization({}) for k in action_data.get(ATTR_DELIVERY, [])} 

135 elif isinstance(delivery_data, str) and delivery_data: 

136 # a bare list of deliveries implies intent to restrict 

137 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for single %s", delivery_data) 

138 if self.delivery_selection is None: 

139 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT 

140 self.delivery_overrides = {delivery_data: DeliveryCustomization({})} 

141 elif isinstance(delivery_data, dict): 

142 # whereas a dict may be used to tune or restrict 

143 if self.delivery_selection is None: 

144 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT 

145 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as implicit for mapping %s", delivery_data) 

146 self.delivery_overrides = {k: DeliveryCustomization(v) for k, v in action_data.get(ATTR_DELIVERY, {}).items()} 

147 elif delivery_data: 

148 _LOGGER.warning("SUPERNOTIFY Unable to interpret delivery data %s", delivery_data) 

149 if self.delivery_selection is None: 

150 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT 

151 else: 

152 if self.delivery_selection is None: 

153 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT 

154 

155 self.action_groups: list[str] | None = nullable_ensure_list(action_data.get(ATTR_ACTION_GROUPS)) 

156 self.recipients_override: list[str] | None = nullable_ensure_list(action_data.get(ATTR_RECIPIENTS)) 

157 self.media: dict[str, Any] = action_data.get(ATTR_MEDIA) or {} 

158 self.debug: bool = action_data.get(ATTR_DEBUG, False) 

159 self.actions: list[dict[str, Any]] = ensure_list(action_data.get(ATTR_ACTIONS)) 

160 

161 self.selected_deliveries: dict[str, dict[str, Any]] = {} 

162 self.enabled_scenarios: dict[str, Scenario] = {} 

163 self.selected_scenario_names: list[str] = [] 

164 self._suppression_reason: SuppressionReason | None = None 

165 self._delivery_error: list[str] | None = None 

166 self.condition_variables: ConditionVariables 

167 

168 async def initialize(self) -> None: 

169 """Async post-construction initialization""" 

170 self.occupancy: dict[str, list[Recipient]] = self.people_registry.determine_occupancy() 

171 self.condition_variables = ConditionVariables( 

172 self.applied_scenario_names, 

173 self.required_scenario_names, 

174 self.constrain_scenario_names, 

175 self.priority, 

176 self.occupancy, 

177 self.message, 

178 self._title, 

179 self.extra_data, 

180 ) # requires occupancy first 

181 

182 enabled_scenario_names: list[str] = list(self.applied_scenario_names) or [] 

183 self.selected_scenario_names = await self.select_scenarios() 

184 enabled_scenario_names.extend(self.selected_scenario_names) 

185 if self.constrain_scenario_names: 

186 enabled_scenario_names = [ 

187 s for s in enabled_scenario_names if (s in self.constrain_scenario_names or s in self.applied_scenario_names) 

188 ] 

189 if self.required_scenario_names and not any(s in enabled_scenario_names for s in self.required_scenario_names): 

190 _LOGGER.info("SUPERNOTIFY suppressing notification, no required scenarios enabled") 

191 self.selected_deliveries = {} 

192 self.suppress(SuppressionReason.NO_SCENARIO) 

193 else: 

194 for s in enabled_scenario_names: 

195 scenario_obj = self.context.scenario_registry.scenarios.get(s) 

196 if scenario_obj is not None: 

197 self.enabled_scenarios[s] = scenario_obj 

198 

199 self.selected_deliveries = self.select_deliveries() 

200 if self.context.snoozer.is_global_snooze(self.priority): 

201 self.suppress(SuppressionReason.SNOOZED) 

202 self.apply_enabled_scenarios() 

203 

204 if not self.media: 

205 self.media = self.media_requirements(self.extra_data) 

206 

207 def outcome(self) -> Outcome: 

208 if self.error_count > 0: 

209 return Outcome.ERROR 

210 if self.dupe: 

211 return Outcome.DUPE 

212 if not self.delivered: 

213 return Outcome.NO_DELIVERY 

214 if self.fallback: 

215 return Outcome.FALLBACK_DELIVERY 

216 if self.skipped: 

217 return Outcome.PARTIAL_DELIVERY 

218 return Outcome.SUCCESS 

219 

220 def media_requirements(self, data: dict[str, Any]) -> dict[str, Any]: 

221 """If no media defined, look for iOS / Android actions that have media defined 

222 

223 Example is the Frigate blueprint, which generates `image`, `video` etc 

224 in the `data` section, that can also be used for email attachments 

225 """ 

226 media_dict = {} 

227 if not data: 

228 return {} 

229 if data.get("image"): 

230 media_dict[ATTR_MEDIA_SNAPSHOT_URL] = data.get("image") 

231 if data.get("video"): 

232 media_dict[ATTR_MEDIA_CLIP_URL] = data.get("video") 

233 if data.get("attachment", {}).get("url"): 

234 url = data["attachment"]["url"] 

235 if url and url.endswith(".mp4") and not media_dict.get(ATTR_MEDIA_CLIP_URL): 

236 media_dict[ATTR_MEDIA_CLIP_URL] = url 

237 elif ( 

238 url 

239 and (url.endswith(".jpg") or url.endswith(".jpeg") or url.endswith(".png")) 

240 and not media_dict.get(ATTR_MEDIA_SNAPSHOT_URL) 

241 ): 

242 media_dict[ATTR_MEDIA_SNAPSHOT_URL] = url 

243 return media_dict 

244 

245 def validate_action_data(self, action_data: dict[str, Any]) -> None: 

246 if action_data.get(ATTR_PRIORITY): 

247 if isinstance(action_data.get(ATTR_PRIORITY), (str, int, float)): 

248 if action_data.get(ATTR_PRIORITY) not in PRIORITY_VALUES: 

249 _LOGGER.info("SUPERNOTIFY custom priority %s", action_data.get(ATTR_PRIORITY)) 

250 else: 

251 _LOGGER.info("SUPERNOTIFY Invalid priority %s", action_data.get(ATTR_PRIORITY)) 

252 self.suppress(SuppressionReason.INVALID_ACTION_DATA) 

253 raise vol.Invalid("Priority value must be a simple value") 

254 try: 

255 humanize.validate_with_humanized_errors(action_data, ACTION_DATA_SCHEMA) 

256 except vol.Invalid as e: 

257 _LOGGER.warning("SUPERNOTIFY invalid action data %s: %s", action_data, e) 

258 self.suppress(SuppressionReason.INVALID_ACTION_DATA) 

259 raise 

260 except vol.error.Error as e2: 

261 _LOGGER.warning("SUPERNOTIFY failed to validate action data %s: %s", action_data, e2) 

262 self.suppress(SuppressionReason.INVALID_ACTION_DATA) 

263 raise vol.Invalid(f"Unable to validate action data - {e2}") from e2 

264 

265 def apply_enabled_scenarios(self) -> None: 

266 """Set media and action_groups from scenario if defined, first come first applied""" 

267 action_groups: list[str] = [] 

268 for scenario in self.enabled_scenarios.values(): 

269 if scenario.media: 

270 if self.media: 

271 self.media.update(scenario.media) 

272 else: 

273 self.media = scenario.media 

274 if scenario.action_groups: 

275 action_groups.extend(ag for ag in scenario.action_groups if ag not in action_groups) 

276 # self.action_groups only accessed from inside Envelope 

277 if self.action_groups: 

278 self.action_groups.extend(action_groups) 

279 else: 

280 self.action_groups = action_groups 

281 

282 def select_deliveries(self) -> dict[str, dict[str, Any]]: 

283 scenario_enable_deliveries: list[str] = [] 

284 scenario_disable_deliveries: list[str] = [] 

285 default_enable_deliveries: list[str] = [] 

286 recipients_enable_deliveries: list[str] = [] 

287 

288 if self.delivery_selection != DELIVERY_SELECTION_FIXED: 

289 for scenario in self.enabled_scenarios.values(): 

290 scenario_enable_deliveries.extend(scenario.enabling_deliveries()) 

291 for scenario in self.enabled_scenarios.values(): 

292 scenario_disable_deliveries.extend(scenario.disabling_deliveries()) 

293 

294 scenario_enable_deliveries = list(set(scenario_enable_deliveries)) 

295 scenario_disable_deliveries = list(set(scenario_disable_deliveries)) 

296 

297 for recipient in self.all_recipients(): 

298 recipients_enable_deliveries.extend(recipient.enabling_delivery_names()) 

299 if self.delivery_selection == DELIVERY_SELECTION_IMPLICIT: 

300 # all deliveries with SELECTION_DEFAULT in CONF_SELECTION 

301 default_enable_deliveries = [d.name for d in self.context.delivery_registry.implicit_deliveries] 

302 

303 self.debug_trace.record_delivery_selection("scenario_enable_deliveries", scenario_enable_deliveries) 

304 self.debug_trace.record_delivery_selection("scenario_disable_deliveries", scenario_disable_deliveries) 

305 self.debug_trace.record_delivery_selection("default_enable_deliveries", default_enable_deliveries) 

306 self.debug_trace.record_delivery_selection("recipient_enable_deliveries", recipients_enable_deliveries) 

307 

308 override_enable_deliveries: list[str] = [] 

309 override_disable_deliveries: list[str] = [] 

310 

311 # apply the deliveries defined in the notification action call 

312 for delivery, delivery_override in self.delivery_overrides.items(): 

313 if ( 

314 (delivery_override is None or delivery_override.enabled is True) 

315 and delivery in self.context.delivery_registry.enabled_deliveries 

316 ) or ( 

317 (delivery_override is not None and delivery_override.enabled is True) 

318 and delivery in self.context.delivery_registry.disabled_deliveries 

319 ): 

320 override_enable_deliveries.append(delivery) 

321 elif delivery_override is not None and delivery_override.enabled is False: 

322 override_disable_deliveries.append(delivery) 

323 

324 # if self.delivery_selection != DELIVERY_SELECTION_FIXED: 

325 # scenario_disable_deliveries = [ 

326 # d.name 

327 # for d in self.context.delivery_registry.deliveries.values() 

328 # if d.selection == [SELECTION_BY_SCENARIO] 

329 # and d.name not in scenario_enable_deliveries 

330 # and (d.name not in override_enable_deliveries or self.delivery_selection != DELIVERY_SELECTION_EXPLICIT) 

331 # ] 

332 all_global_enabled: list[str] = list( 

333 set(scenario_enable_deliveries + default_enable_deliveries + override_enable_deliveries) 

334 ) 

335 all_enabled: list[str] = all_global_enabled + recipients_enable_deliveries 

336 # override_enable_deliveries takes precedence: if the action call explicitly 

337 # re-enables a delivery that a scenario disabled, remove it from all_disabled. 

338 all_disabled: list[str] = [ 

339 d for d in scenario_disable_deliveries + override_disable_deliveries if d not in override_enable_deliveries 

340 ] 

341 override_enabled: list[str] = list(set(scenario_enable_deliveries + override_enable_deliveries)) 

342 self.debug_trace.record_delivery_selection("override_disable_deliveries", override_disable_deliveries) 

343 self.debug_trace.record_delivery_selection("override_enable_deliveries", override_enable_deliveries) 

344 

345 unsorted_maybe_objs: list[Delivery | None] = [ 

346 self.delivery_registry.deliveries.get(d) for d in all_enabled if d not in all_disabled 

347 ] 

348 unsorted_objs: list[Delivery] = [ 

349 d for d in unsorted_maybe_objs if d is not None and (d.enabled or d.name in override_enabled) 

350 ] 

351 first: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.FIRST] 

352 anywhere: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.ANY] 

353 last: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.LAST] 

354 selected = first + anywhere + last 

355 self.debug_trace.record_delivery_selection("ranked", selected) 

356 

357 # TODO: clean up this ugly logic, reorganize delivery around people 

358 results: dict[str, dict[str, Any]] = {d: {} for d in selected} 

359 personal_deliveries = [d for d in selected if d not in all_global_enabled and d in recipients_enable_deliveries] 

360 for personal_delivery in personal_deliveries: 

361 results[personal_delivery].setdefault("recipients", []) 

362 for recipient in self.all_recipients(): 

363 if personal_delivery in recipient.enabling_delivery_names(): 

364 results[personal_delivery]["recipients"].append(recipient.entity_id) 

365 return results 

366 

367 def suppress(self, reason: SuppressionReason) -> None: 

368 self._suppression_reason = reason 

369 if reason not in self._skip_reasons: 

370 self._skip_reasons.append(reason) 

371 _LOGGER.info(f"SUPERNOTIFY Suppressing notification, reason:{reason}, id:{self.id}") 

372 

373 async def deliver(self) -> bool: 

374 _LOGGER.debug( 

375 "Message: %s, notification: %s, deliveries: %s", 

376 self.message, 

377 self.id, 

378 self.selected_deliveries, 

379 ) 

380 

381 for delivery_name, details in self.selected_deliveries.items(): 

382 self.deliveries[delivery_name] = {} 

383 delivery = self.context.delivery_registry.deliveries.get(delivery_name) 

384 if self._suppression_reason is not None: 

385 _LOGGER.info("SUPERNOTIFY Suppressing globally silenced/snoozed notification (%s)", self.id) 

386 self.record_result(delivery, suppression_reason=SuppressionReason.SNOOZED) 

387 elif delivery: 

388 await self.call_transport(delivery, recipients=details.get("recipients")) 

389 else: 

390 _LOGGER.error(f"SUPERNOTIFY Unexpected missing delivery {delivery_name}") 

391 

392 if self.delivered == 0 and not self._suppression_reason: 

393 if self.failed == 0 and not self.dupe: 

394 for delivery in self.context.delivery_registry.fallback_by_default_deliveries: 

395 _LOGGER.info( 

396 "SUPERNOTIFY no delivery succeeded, activating fallback_by_default: %s", 

397 delivery.name, 

398 ) 

399 if delivery.name not in self.selected_deliveries: 

400 await self.call_transport(delivery) 

401 self.fallback += 1 

402 

403 if self.failed > 0: 

404 for delivery in self.context.delivery_registry.fallback_on_error_deliveries: 

405 _LOGGER.warning( 

406 "SUPERNOTIFY delivery failed, activating fallback_on_error: %s", 

407 delivery.name, 

408 ) 

409 if delivery.name not in self.selected_deliveries: 

410 await self.call_transport(delivery) 

411 self.fallback += 1 

412 

413 return self.delivered > 0 

414 

415 async def call_transport(self, delivery: Delivery, recipients: list[str] | None = None) -> None: 

416 try: 

417 transport: Transport = delivery.transport 

418 if not transport.enabled: 

419 self.record_result(delivery, suppression_reason=SuppressionReason.TRANSPORT_DISABLED) 

420 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on transport disabled", delivery) 

421 return 

422 

423 delivery_priorities: list[str] = delivery.priority 

424 if self.priority and delivery_priorities and self.priority not in delivery_priorities: 

425 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on priority (%s)", delivery, self.priority) 

426 self.record_result(delivery, suppression_reason=SuppressionReason.PRIORITY) 

427 return 

428 if not delivery.evaluate_conditions(self.condition_variables): 

429 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on conditions", delivery) 

430 self.record_result(delivery, suppression_reason=SuppressionReason.DELIVERY_CONDITION) 

431 return 

432 

433 targets: list[Target] = self.generate_targets(delivery, recipients=recipients) 

434 envelopes: list[Envelope] = self.generate_envelopes(delivery, targets) 

435 if not envelopes: 

436 if delivery.target_required == TargetRequired.ALWAYS and ( 

437 not targets or not any(t.has_resolved_target() for t in targets) 

438 ): 

439 reason: SuppressionReason = SuppressionReason.NO_TARGET 

440 else: 

441 reason = SuppressionReason.UNKNOWN 

442 self.record_result(delivery, targets=targets, suppression_reason=reason) 

443 

444 for envelope in envelopes: 

445 if not self.force_resend and self.context.dupe_checker.check(envelope): 

446 _LOGGER.debug("SUPERNOTIFY Suppressing dupe envelope, %s", self.message) 

447 self.record_result(delivery, envelope, suppression_reason=SuppressionReason.DUPE) 

448 continue 

449 try: 

450 if not await transport.deliver(envelope, debug_trace=self.debug_trace): 

451 _LOGGER.info( 

452 "SUPERNOTIFY No delivery for %s (targets: %s)", 

453 delivery.name, 

454 envelope.target.as_dict() if envelope.target else "NONE", 

455 ) 

456 self.record_result(delivery, envelope) 

457 except Exception as e2: 

458 _LOGGER.exception("SUPERNOTIFY Failed to deliver %s: %s", delivery.name, e2) 

459 envelope.error_count = envelope.error_count + 1 

460 transport.record_error(str(e2), method="deliver") 

461 envelope.delivery_error = format_exception(e2) 

462 self.record_result(delivery, envelope) 

463 

464 except Exception as e: 

465 _LOGGER.exception( 

466 "SUPERNOTIFY Failed to notify using delivery %s via %s", 

467 delivery.name, 

468 type(delivery.transport).__name__, 

469 ) 

470 self.deliveries.setdefault(delivery.name, {}) 

471 self.deliveries[delivery.name].setdefault("errors", []) 

472 errors: list[str] = cast("list[str]", self.deliveries[delivery.name]["errors"]) 

473 errors.append("\n".join(format_exception(e))) 

474 

475 def record_result( 

476 self, 

477 delivery: Delivery | None, 

478 envelope: Envelope | None = None, 

479 targets: list[Target] | None = None, 

480 suppression_reason: SuppressionReason | None = None, 

481 ) -> None: 

482 """Debugging (and unit test) support for notifications that failed or were skipped""" 

483 if delivery: 

484 if envelope: 

485 self.delivered += envelope.delivered 

486 self.error_count += envelope.error_count 

487 self.deliveries.setdefault(delivery.name, {}) 

488 if envelope.delivered: 

489 self.deliveries[delivery.name].setdefault(KEY_DELIVERED, []) 

490 self.deliveries[delivery.name][KEY_DELIVERED].append(envelope) # type: ignore 

491 else: 

492 if suppression_reason: 

493 envelope.skip_reason = suppression_reason 

494 if suppression_reason not in self._skip_reasons: 

495 self._skip_reasons.append(suppression_reason) 

496 if suppression_reason == SuppressionReason.DUPE: 

497 self.dupe = True 

498 if envelope.error_count: 

499 self.deliveries[delivery.name].setdefault(KEY_FAILED, []) 

500 self.deliveries[delivery.name][KEY_FAILED].append(envelope) # type: ignore 

501 self.failed += 1 

502 else: 

503 self.deliveries[delivery.name].setdefault(KEY_SUPPRESSED, []) 

504 self.deliveries[delivery.name][KEY_SUPPRESSED].append(envelope) # type: ignore 

505 self.suppressed += 1 

506 

507 if not envelope: 

508 delivery_name: str = delivery.name if delivery else "!UNKNOWN!" 

509 skip_summary: dict[str, Any] = { 

510 "target_required": delivery.target_required if delivery else "!UNKNOWN!", 

511 "suppression_reason": str(suppression_reason), 

512 } 

513 self.deliveries.setdefault(delivery_name, {}) 

514 if targets: 

515 skip_summary["targets"] = targets 

516 self.deliveries[delivery_name][KEY_SKIPPED] = skip_summary 

517 self.skipped += 1 

518 

519 def contents(self, diagnostics: bool = False, **_kwargs: Any) -> dict[str, Any]: 

520 """ArchiveableObject implementation""" 

521 minimal = not diagnostics 

522 object_refs = ["context", "people_registry", "delivery_registry"] 

523 keys_only = ["enabled_scenarios"] 

524 debug_only = ["debug_trace"] 

525 exposed_if_populated = ["_delivery_error", "message_html", "extra_data", "actions", "_suppression_reason"] 

526 # fine tune dict order to ease the eye-burden when reviewing archived notifications 

527 preferred_order = [ 

528 "id", 

529 "created", 

530 "message", 

531 "applied_scenario_names", 

532 "constrain_scenario_names", 

533 "required_scenario_names", 

534 "enabled_scenarios", 

535 "selected_scenario_names", 

536 "delivery_selection", 

537 "delivery_overrides", 

538 "delivery_selection", 

539 "selected_deliveries", 

540 "recipients_override", 

541 "delivered", 

542 "failed", 

543 "suppressed", 

544 "skipped", 

545 "error_count", 

546 "deliveries", 

547 ] 

548 # preferred fields 

549 result = { 

550 k: sanitize( 

551 self.__dict__[k], minimal=minimal, occupancy_only=True, top_level_keys_only=(minimal and k in keys_only) 

552 ) 

553 for k in preferred_order 

554 } 

555 # all the rest not explicitly excluded 

556 result.update({ 

557 k: sanitize(v, minimal=minimal, occupancy_only=True) 

558 for k, v in self.__dict__.items() 

559 if k not in result 

560 and k not in exposed_if_populated 

561 and k not in object_refs 

562 and not k.startswith("_") 

563 and (not minimal or k not in keys_only) 

564 and (not minimal or k not in debug_only) 

565 }) 

566 # the exposed only if populated fields 

567 result.update({ 

568 k: sanitize(self.__dict__[k], minimal=minimal, occupancy_only=True) 

569 for k in exposed_if_populated 

570 if self.__dict__.get(k) 

571 }) 

572 # delivery_stats: aggregate delivery metrics 

573 try: 

574 all_durations: dict[str, float] = {} 

575 total_ok = 0 

576 total_all = 0 

577 for d_name, outcomes in self.deliveries.items(): 

578 for envelope in outcomes.get(KEY_DELIVERED, []): 

579 dur = sum(c.contents().get("elapsed", 0) for c in getattr(envelope, "calls", [])) * 1000 

580 all_durations[d_name] = dur 

581 total_ok += 1 

582 total_all += 1 

583 for _envelope in outcomes.get(KEY_FAILED, []): 

584 all_durations.setdefault(d_name, 0) 

585 total_all += 1 

586 if outcomes.get(KEY_SKIPPED): 

587 total_all += 1 

588 if all_durations: 

589 result["stats"] = { 

590 "total_duration_ms": round(sum(all_durations.values()), 1), 

591 "slowest_delivery": max(all_durations, key=lambda k: all_durations[k]), 

592 "fastest_delivery": min(all_durations, key=lambda k: all_durations[k]), 

593 "delivery_success_rate": round(total_ok / total_all, 2) if total_all else 1.0, 

594 } 

595 except Exception as e: 

596 _LOGGER.warning("SUPERNOTIFY delivery_stats computation failed: %s", e) 

597 return result 

598 

599 def base_filename(self) -> str: 

600 """ArchiveableObject implementation""" 

601 return f"{self.created.isoformat()[:16].replace(':', '-')}_{self.id}" 

602 

603 def delivery_data(self, delivery: Delivery) -> dict[str, Any]: 

604 if delivery is None: 

605 return {} 

606 delivery_override: DeliveryCustomization | None = self.delivery_overrides.get(delivery.name) 

607 if delivery_override is None: 

608 delivery_override = self.delivery_overrides.get(delivery.transport.name) 

609 return delivery_override.data if delivery_override and delivery_override.data else {} 

610 

611 @property 

612 def delivered_envelopes(self) -> list[Envelope]: 

613 result: list[Envelope] = [] 

614 for delivery_result in self.deliveries.values(): 

615 result.extend(cast("list[Envelope]", delivery_result.get(KEY_DELIVERED, []))) 

616 return result 

617 

618 @property 

619 def undelivered_envelopes(self) -> list[Envelope]: 

620 result: list[Envelope] = [] 

621 for delivery_result in self.deliveries.values(): 

622 result.extend(cast("list[Envelope]", delivery_result.get(KEY_SUPPRESSED, []))) 

623 result.extend(cast("list[Envelope]", delivery_result.get(KEY_FAILED, []))) 

624 return result 

625 

626 async def select_scenarios(self) -> list[str]: 

627 return [s.name for s in self.context.scenario_registry.scenarios.values() if s.evaluate(self.condition_variables)] 

628 

629 def generate_targets(self, delivery: Delivery, recipients: list[str] | None = None) -> list[Target]: 

630 

631 if delivery.target_required == TargetRequired.NEVER: 

632 # don't waste time computing targets for deliveries that don't need them 

633 return [Target(None, target_data=delivery.data)] 

634 

635 computed_target: Target 

636 

637 if delivery.target_usage == TARGET_USE_FIXED: 

638 if delivery.target: 

639 computed_target = delivery.target.safe_copy() 

640 self.debug_trace.record_target(delivery.name, "100_delivery_default_fixed", computed_target) 

641 else: 

642 computed_target = Target(None, target_data=delivery.data) 

643 self.debug_trace.record_target(delivery.name, "101_delivery_default_fixed_empty", computed_target) 

644 elif recipients is not None: 

645 computed_target = Target(recipients) 

646 self.debug_trace.record_target(delivery.name, "102_delivery_default_fixed", computed_target) 

647 

648 elif not self._target: 

649 # Unless there are explicit targets, include everyone on the people registry 

650 computed_target = self.default_person_ids(delivery) 

651 self.debug_trace.record_target(delivery.name, "201_no_action_target", computed_target) 

652 else: 

653 computed_target = self._target.safe_copy() 

654 self.debug_trace.record_target(delivery.name, "202_action_target", computed_target) 

655 

656 # 1st round of filtering for snooze and resolving people->direct targets 

657 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery) 

658 self.debug_trace.record_target(delivery.name, "300_post_snooze", computed_target) 

659 # turn person_ids into emails and phone numbers 

660 for indirect_target in self.resolve_indirect_targets(computed_target, delivery): 

661 computed_target += indirect_target 

662 self.debug_trace.record_target(delivery.name, "310_resolve_indirect", computed_target) 

663 computed_target += self.resolve_scenario_targets(delivery) 

664 self.debug_trace.record_target(delivery.name, "320_resolved_scenario_targets", computed_target) 

665 # filter out target not required for this delivery 

666 computed_target = delivery.select_targets(computed_target) 

667 self.debug_trace.record_target(delivery.name, "330_delivery_selection", computed_target) 

668 primary_count = len(computed_target) 

669 

670 if delivery.target_usage == TARGET_USE_ON_NO_DELIVERY_TARGETS: 

671 if not computed_target.has_targets() and delivery.target: 

672 computed_target += delivery.target 

673 self.debug_trace.record_target(delivery.name, "400_delivery_default_no_delivery_targets", computed_target) 

674 elif delivery.target_usage == TARGET_USE_ON_NO_ACTION_TARGETS: 

675 if not self._target and delivery.target: 

676 computed_target += delivery.target 

677 self.debug_trace.record_target(delivery.name, "401_delivery_default_no_action_targets", computed_target) 

678 elif delivery.target_usage == TARGET_USE_MERGE_ON_DELIVERY_TARGETS: 

679 # merge in the delivery defaults if there's a target defined in action call 

680 if computed_target.has_targets() and delivery.target: 

681 computed_target += delivery.target 

682 self.debug_trace.record_target(delivery.name, "402_delivery_merge_on_delivery_targets", computed_target) 

683 elif delivery.target_usage == TARGET_USE_MERGE_ALWAYS: 

684 # merge in the delivery defaults even if there's not a target defined in action call 

685 if delivery.target: 

686 computed_target += delivery.target 

687 self.debug_trace.record_target(delivery.name, "403_delivery_merge_always_targets", computed_target) 

688 elif delivery.target_usage == TARGET_USE_FIXED: 

689 _LOGGER.debug("SUPERNOTIFY Fixed target on delivery %s", delivery.name) 

690 self.debug_trace.record_target(delivery.name, "404_fixed_target", computed_target) 

691 else: 

692 self.debug_trace.record_target(delivery.name, "405_no_target_usage_match", computed_target) 

693 _LOGGER.debug("SUPERNOTIFY No useful target definition for delivery %s", delivery.name) 

694 

695 if len(computed_target) > primary_count: 

696 _LOGGER.debug( 

697 "SUPERNOTIFY Delivery config added %s targets for %s", len(computed_target) - primary_count, delivery.name 

698 ) 

699 

700 # 2nd round of filtering for snooze and resolving people->direct targets after delivery target applied 

701 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery) 

702 self.debug_trace.record_target(delivery.name, "501_post_snooze", computed_target) 

703 for indirect_target in self.resolve_indirect_targets(computed_target, delivery): 

704 computed_target += indirect_target 

705 self.debug_trace.record_target(delivery.name, "502_resolved_indirect_targets", computed_target) 

706 computed_target += self.resolve_scenario_targets(delivery) 

707 self.debug_trace.record_target(delivery.name, "503_resolved_scenario_targets", computed_target) 

708 computed_target = delivery.select_targets(computed_target) 

709 self.debug_trace.record_target(delivery.name, "504_delivery_selection", computed_target) 

710 

711 # If the action call explicitly specified a target for this delivery, it takes 

712 # precedence over all resolved/merged targets above. 

713 delivery_override: DeliveryCustomization | None = self.delivery_overrides.get(delivery.name) 

714 if delivery_override is None: 

715 # if override doesn't use a valid delivery name, try a transport name instead 

716 delivery_override = self.delivery_overrides.get(delivery.transport.name) 

717 if delivery_override and delivery_override.target and delivery_override.target.has_targets(): 

718 override_target = delivery_override.target 

719 # handle and resolve indirect targets, like person->mobile device or email 

720 for indirect_target in self.resolve_indirect_targets(override_target, delivery): 

721 override_target += indirect_target 

722 computed_target = delivery.select_targets(override_target) 

723 self.debug_trace.record_target(delivery.name, "600_delivery_override_target", computed_target) 

724 

725 split_targets: list[Target] = computed_target.split_by_target_data() 

726 self.debug_trace.record_target(delivery.name, "610_delivery_split_targets", split_targets) 

727 

728 direct_targets: list[Target] = [t.direct() for t in split_targets] 

729 self.debug_trace.record_target(delivery.name, "620_narrow_to_direct", direct_targets) 

730 

731 if delivery.options.get(OPTION_UNIQUE_TARGETS, False): 

732 direct_targets = [t - self._already_selected for t in direct_targets] 

733 self.debug_trace.record_target(delivery.name, "630_make_unique_across_deliveries", direct_targets) 

734 for direct_target in direct_targets: 

735 self._already_selected += direct_target 

736 self.debug_trace.record_target(delivery.name, "999_final_cut", direct_targets) 

737 return direct_targets 

738 

739 def resolve_scenario_targets(self, delivery: Delivery) -> Target: 

740 resolved: Target = Target() 

741 for scenario in self.enabled_scenarios.values(): 

742 customization: DeliveryCustomization | None = scenario.delivery_customization(delivery.name) 

743 if customization and customization.target and customization.target.has_targets(): 

744 resolved += customization.target 

745 return resolved 

746 

747 def all_recipients(self) -> list[Recipient]: 

748 recipients: list[Recipient] = [] 

749 if self._target: 

750 # explicit targets given 

751 recipients.extend( 

752 self.people_registry.people[pers_ent_id] 

753 for pers_ent_id in self._target.person_ids 

754 if pers_ent_id in self.people_registry.people and self.people_registry.people[pers_ent_id].enabled 

755 ) 

756 else: 

757 # default to all known recipients 

758 recipients = self.people_registry.enabled_recipients() 

759 recipients = [r for r in recipients if self.recipients_override is None or r.entity_id in self.recipients_override] 

760 return recipients 

761 

762 def default_person_ids(self, delivery: Delivery) -> Target: 

763 # If target not specified on service call or delivery, then default to std list of recipients 

764 people: list[Recipient] = self.people_registry.filter_recipients_by_occupancy(delivery.occupancy) 

765 people = [p for p in people if self.recipients_override is None or p.entity_id in self.recipients_override] 

766 return Target({ATTR_PERSON_ID: [p.entity_id for p in people if p.entity_id]}) 

767 

768 def resolve_indirect_targets(self, target: Target, delivery: Delivery) -> list[Target]: 

769 # enrich data selected in configuration for this delivery, from direct target definition or attrs like email or phone 

770 resolved: Target = Target() 

771 additional: list[Target] = [] 

772 

773 for person_id in target.person_ids: 

774 recipient: Recipient | None = self.people_registry.people.get(person_id) 

775 if recipient and recipient.enabled: 

776 recipient_target = recipient.target(delivery.name) 

777 if recipient_target.target_specific_data: 

778 additional.append(recipient_target) 

779 else: 

780 resolved += recipient_target 

781 else: 

782 _LOGGER.debug("SUPERNOTIFY Skipping recipient %s with enabled switched off", person_id) 

783 

784 return [resolved, *additional] 

785 

786 def generate_envelopes(self, delivery: Delivery, targets: list[Target]) -> list[Envelope]: 

787 # now the list of recipients determined, resolve this to target addresses or entities 

788 

789 envelopes: list[Envelope] = [] 

790 for target in targets: 

791 # a target is always generated, even if there are no recipients 

792 if target.has_resolved_target() or delivery.target_required != TargetRequired.ALWAYS: 

793 envelope_data = {} 

794 

795 # least priority - delivery derived data 

796 envelope_data.update(delivery.data) 

797 # next least priority - target derived data 

798 if target.target_data: 

799 envelope_data.update(target.target_data) 

800 

801 # scenario applied at cross-delivery level in apply_enabled_scenarios 

802 for scenario in self.enabled_scenarios.values(): 

803 customization: DeliveryCustomization | None = scenario.delivery_customization(delivery.name) 

804 if customization and customization.data: 

805 envelope_data.update(customization.data) 

806 

807 # apply data from action call last to prioritize it 

808 envelope_data.update({ 

809 k: v for k, v in self.extra_data.items() if k not in INTERNAL_DATA_KEYS 

810 }) # action call data 

811 

812 envelopes.append(Envelope(delivery, self, target, envelope_data, context=self.context)) 

813 

814 return envelopes