Coverage for custom_components/supernotify/notification.py: 90%
500 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1import datetime as dt
2import logging
3import uuid
4from traceback import format_exception
5from typing import TYPE_CHECKING, Any, cast
7import homeassistant.util.dt as dt_util
8import voluptuous as vol
9from homeassistant.components.notify.const import ATTR_DATA
10from voluptuous import humanize
12from custom_components.supernotify.schema import SelectionRank
14from .archive import ArchivableObject
15from .common import ensure_list, nullable_ensure_list, sanitize
16from .const import (
17 ATTR_ACTION_GROUPS,
18 ATTR_ACTIONS,
19 ATTR_DEBUG,
20 ATTR_DELIVERY,
21 ATTR_DELIVERY_SELECTION,
22 ATTR_FORCE_RESEND,
23 ATTR_MEDIA,
24 ATTR_MEDIA_CLIP_URL,
25 ATTR_MEDIA_SNAPSHOT_URL,
26 ATTR_MESSAGE_HTML,
27 ATTR_PERSON_ID,
28 ATTR_PRIORITY,
29 ATTR_RECIPIENTS,
30 ATTR_SCENARIOS_APPLY,
31 ATTR_SCENARIOS_CONSTRAIN,
32 ATTR_SCENARIOS_REQUIRE,
33 ATTR_SPOKEN_MESSAGE,
34 DELIVERY_SELECTION_EXPLICIT,
35 DELIVERY_SELECTION_FIXED,
36 DELIVERY_SELECTION_IMPLICIT,
37 OPTION_UNIQUE_TARGETS,
38 PRIORITY_MEDIUM,
39 PRIORITY_VALUES,
40 TARGET_USE_FIXED,
41 TARGET_USE_MERGE_ALWAYS,
42 TARGET_USE_MERGE_ON_DELIVERY_TARGETS,
43 TARGET_USE_ON_NO_ACTION_TARGETS,
44 TARGET_USE_ON_NO_DELIVERY_TARGETS,
45)
46from .envelope import Envelope
47from .model import (
48 ConditionVariables,
49 DebugTrace,
50 DeliveryCustomization,
51 SuppressionReason,
52 Target,
53 TargetRequired,
54)
55from .schema import ACTION_DATA_SCHEMA, STRICT_ACTION_DATA_SCHEMA, Outcome
57if TYPE_CHECKING:
58 from .context import Context
59 from .delivery import Delivery, DeliveryRegistry
60 from .people import PeopleRegistry, Recipient
61 from .scenario import Scenario
62 from .transport import (
63 Transport,
64 )
66_LOGGER = logging.getLogger(__name__)
68# Deliveries mapping keys for debug / archive
69KEY_DELIVERED = "delivered"
70KEY_SUPPRESSED = "suppressed"
71KEY_FAILED = "failed"
72KEY_SKIPPED = "skipped"
74# supernotify specific data items not to be passed to transports in data
75INTERNAL_DATA_KEYS = (ATTR_FORCE_RESEND, ATTR_SPOKEN_MESSAGE)
77type t_delivery_name = str
78type t_outcome = str
81class Notification(ArchivableObject):
82 def __init__(
83 self,
84 context: Context,
85 message: str | None = None,
86 title: str | None = None,
87 target: list[str] | str | None = None,
88 action_data: dict[str, Any] | None = None,
89 ) -> None:
90 self.created: dt.datetime = dt.datetime.now(tz=dt_util.get_default_time_zone())
91 self.debug_trace: DebugTrace = DebugTrace(message=message, title=title, data=action_data, target=target)
92 self.message: str | None = message
93 self.context: Context = context
94 self.people_registry: PeopleRegistry = context.people_registry
95 self.delivery_registry: DeliveryRegistry = context.delivery_registry
96 action_data = action_data or {}
97 self._target: Target | None = Target(target) if target else None
98 self._already_selected: Target = Target()
99 self._title: str | None = title
100 self.id = str(uuid.uuid1())
101 self.delivered: int = 0
102 self.error_count: int = 0
103 self.skipped: int = 0
104 self.failed: int = 0
105 self.suppressed: int = 0
106 self.fallback: int = 0
107 self.dupe: bool = False
108 self.deliveries: dict[t_delivery_name, dict[t_outcome, list[str] | list[Envelope] | dict[str, Any]]] = {}
109 self._skip_reasons: list[SuppressionReason] = []
111 self.validate_action_data(action_data)
112 # for compatibility with other notify calls, pass thru surplus data to underlying delivery transports
113 self.extra_data: dict[str, Any] = {
114 k: v for k, v in action_data.items() if k not in STRICT_ACTION_DATA_SCHEMA(action_data)
115 }
117 action_data = {k: v for k, v in action_data.items() if k not in self.extra_data}
118 self.extra_data.update(action_data.get(ATTR_DATA, {})) # nested `data` could be supernotify or target service
120 self.priority: str = action_data.get(ATTR_PRIORITY, PRIORITY_MEDIUM)
121 self.message_html: str | None = action_data.get(ATTR_MESSAGE_HTML)
122 self.force_resend: bool = action_data.get(ATTR_FORCE_RESEND, False)
123 self.required_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_REQUIRE))
124 self.applied_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_APPLY))
125 self.constrain_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_CONSTRAIN))
126 self.delivery_selection: str | None = action_data.get(ATTR_DELIVERY_SELECTION)
127 self.delivery_overrides: dict[str, DeliveryCustomization] = {}
129 delivery_data = action_data.get(ATTR_DELIVERY)
130 if isinstance(delivery_data, list):
131 # a bare list of deliveries implies intent to restrict
132 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for list %s", delivery_data)
133 if self.delivery_selection is None:
134 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT
135 self.delivery_overrides = {k: DeliveryCustomization({}) for k in action_data.get(ATTR_DELIVERY, [])}
136 elif isinstance(delivery_data, str) and delivery_data:
137 # a bare list of deliveries implies intent to restrict
138 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for single %s", delivery_data)
139 if self.delivery_selection is None:
140 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT
141 self.delivery_overrides = {delivery_data: DeliveryCustomization({})}
142 elif isinstance(delivery_data, dict):
143 # whereas a dict may be used to tune or restrict
144 if self.delivery_selection is None:
145 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT
146 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as implicit for mapping %s", delivery_data)
147 self.delivery_overrides = {k: DeliveryCustomization(v) for k, v in action_data.get(ATTR_DELIVERY, {}).items()}
148 elif delivery_data:
149 _LOGGER.warning("SUPERNOTIFY Unable to interpret delivery data %s", delivery_data)
150 if self.delivery_selection is None:
151 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT
152 else:
153 if self.delivery_selection is None:
154 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT
156 self.action_groups: list[str] | None = nullable_ensure_list(action_data.get(ATTR_ACTION_GROUPS))
157 self.recipients_override: list[str] | None = nullable_ensure_list(action_data.get(ATTR_RECIPIENTS))
158 self.media: dict[str, Any] = action_data.get(ATTR_MEDIA) or {}
159 self.debug: bool = action_data.get(ATTR_DEBUG, False)
160 self.actions: list[dict[str, Any]] = ensure_list(action_data.get(ATTR_ACTIONS))
162 self.selected_deliveries: dict[str, dict[str, Any]] = {}
163 self.enabled_scenarios: dict[str, Scenario] = {}
164 self.selected_scenario_names: list[str] = []
165 self._suppression_reason: SuppressionReason | None = None
166 self._delivery_error: list[str] | None = None
167 self.condition_variables: ConditionVariables
169 async def initialize(self) -> None:
170 """Async post-construction initialization"""
171 self.occupancy: dict[str, list[Recipient]] = self.people_registry.determine_occupancy()
172 self.condition_variables = ConditionVariables(
173 self.applied_scenario_names,
174 self.required_scenario_names,
175 self.constrain_scenario_names,
176 self.priority,
177 self.occupancy,
178 self.message,
179 self._title,
180 self.extra_data,
181 ) # requires occupancy first
183 enabled_scenario_names: list[str] = list(self.applied_scenario_names) or []
184 self.selected_scenario_names = await self.select_scenarios()
185 enabled_scenario_names.extend(self.selected_scenario_names)
186 if self.constrain_scenario_names:
187 enabled_scenario_names = [
188 s for s in enabled_scenario_names if (s in self.constrain_scenario_names or s in self.applied_scenario_names)
189 ]
190 if self.required_scenario_names and not any(s in enabled_scenario_names for s in self.required_scenario_names):
191 _LOGGER.info("SUPERNOTIFY suppressing notification, no required scenarios enabled")
192 self.selected_deliveries = {}
193 self.suppress(SuppressionReason.NO_SCENARIO)
194 else:
195 for s in enabled_scenario_names:
196 scenario_obj = self.context.scenario_registry.scenarios.get(s)
197 if scenario_obj is not None:
198 self.enabled_scenarios[s] = scenario_obj
200 self.selected_deliveries = self.select_deliveries()
201 if self.context.snoozer.is_global_snooze(self.priority):
202 self.suppress(SuppressionReason.SNOOZED)
203 self.apply_enabled_scenarios()
205 if not self.media:
206 self.media = self.media_requirements(self.extra_data)
208 def outcome(self) -> Outcome:
209 if self.error_count > 0:
210 return Outcome.ERROR
211 if self.dupe:
212 return Outcome.DUPE
213 if not self.delivered:
214 return Outcome.NO_DELIVERY
215 if self.fallback:
216 return Outcome.FALLBACK_DELIVERY
217 if self.skipped:
218 return Outcome.PARTIAL_DELIVERY
219 return Outcome.SUCCESS
221 def media_requirements(self, data: dict[str, Any]) -> dict[str, Any]:
222 """If no media defined, look for iOS / Android actions that have media defined
224 Example is the Frigate blueprint, which generates `image`, `video` etc
225 in the `data` section, that can also be used for email attachments
226 """
227 media_dict = {}
228 if not data:
229 return {}
230 if data.get("image"):
231 media_dict[ATTR_MEDIA_SNAPSHOT_URL] = data.get("image")
232 if data.get("video"):
233 media_dict[ATTR_MEDIA_CLIP_URL] = data.get("video")
234 if data.get("attachment", {}).get("url"):
235 url = data["attachment"]["url"]
236 if url and url.endswith(".mp4") and not media_dict.get(ATTR_MEDIA_CLIP_URL):
237 media_dict[ATTR_MEDIA_CLIP_URL] = url
238 elif (
239 url
240 and (url.endswith(".jpg") or url.endswith(".jpeg") or url.endswith(".png"))
241 and not media_dict.get(ATTR_MEDIA_SNAPSHOT_URL)
242 ):
243 media_dict[ATTR_MEDIA_SNAPSHOT_URL] = url
244 return media_dict
246 def validate_action_data(self, action_data: dict[str, Any]) -> None:
247 if action_data.get(ATTR_PRIORITY):
248 if isinstance(action_data.get(ATTR_PRIORITY), (str, int, float)):
249 if action_data.get(ATTR_PRIORITY) not in PRIORITY_VALUES:
250 _LOGGER.info("SUPERNOTIFY custom priority %s", action_data.get(ATTR_PRIORITY))
251 else:
252 _LOGGER.info("SUPERNOTIFY Invalid priority %s", action_data.get(ATTR_PRIORITY))
253 self.suppress(SuppressionReason.INVALID_ACTION_DATA)
254 raise vol.Invalid("Priority value must be a simple value")
255 try:
256 humanize.validate_with_humanized_errors(action_data, ACTION_DATA_SCHEMA)
257 except vol.Invalid as e:
258 _LOGGER.warning("SUPERNOTIFY invalid action data %s: %s", action_data, e)
259 self.suppress(SuppressionReason.INVALID_ACTION_DATA)
260 raise
261 except vol.error.Error as e2:
262 _LOGGER.warning("SUPERNOTIFY failed to validate action data %s: %s", action_data, e2)
263 self.suppress(SuppressionReason.INVALID_ACTION_DATA)
264 raise vol.Invalid(f"Unable to validate action data - {e2}") from e2
266 def apply_enabled_scenarios(self) -> None:
267 """Set media and action_groups from scenario if defined, first come first applied"""
268 action_groups: list[str] = []
269 for scenario in self.enabled_scenarios.values():
270 if scenario.media:
271 if self.media:
272 self.media.update(scenario.media)
273 else:
274 self.media = scenario.media
275 if scenario.action_groups:
276 action_groups.extend(ag for ag in scenario.action_groups if ag not in action_groups)
277 # self.action_groups only accessed from inside Envelope
278 if self.action_groups:
279 self.action_groups.extend(action_groups)
280 else:
281 self.action_groups = action_groups
283 def select_deliveries(self) -> dict[str, dict[str, Any]]:
284 scenario_enable_deliveries: list[str] = []
285 scenario_disable_deliveries: list[str] = []
286 default_enable_deliveries: list[str] = []
287 recipients_enable_deliveries: list[str] = []
289 if self.delivery_selection != DELIVERY_SELECTION_FIXED:
290 for scenario in self.enabled_scenarios.values():
291 scenario_enable_deliveries.extend(scenario.enabling_deliveries())
292 for scenario in self.enabled_scenarios.values():
293 scenario_disable_deliveries.extend(scenario.disabling_deliveries())
295 scenario_enable_deliveries = list(set(scenario_enable_deliveries))
296 scenario_disable_deliveries = list(set(scenario_disable_deliveries))
298 for recipient in self.all_recipients():
299 recipients_enable_deliveries.extend(recipient.enabling_delivery_names())
300 if self.delivery_selection == DELIVERY_SELECTION_IMPLICIT:
301 # all deliveries with SELECTION_DEFAULT in CONF_SELECTION
302 default_enable_deliveries = [d.name for d in self.context.delivery_registry.implicit_deliveries]
304 self.debug_trace.record_delivery_selection("scenario_enable_deliveries", scenario_enable_deliveries)
305 self.debug_trace.record_delivery_selection("scenario_disable_deliveries", scenario_disable_deliveries)
306 self.debug_trace.record_delivery_selection("default_enable_deliveries", default_enable_deliveries)
307 self.debug_trace.record_delivery_selection("recipient_enable_deliveries", recipients_enable_deliveries)
309 override_enable_deliveries: list[str] = []
310 override_disable_deliveries: list[str] = []
312 # apply the deliveries defined in the notification action call
313 for delivery, delivery_override in self.delivery_overrides.items():
314 if (
315 (delivery_override is None or delivery_override.enabled is True)
316 and delivery in self.context.delivery_registry.enabled_deliveries
317 ) or (
318 (delivery_override is not None and delivery_override.enabled is True)
319 and delivery in self.context.delivery_registry.disabled_deliveries
320 ):
321 override_enable_deliveries.append(delivery)
322 elif delivery_override is not None and delivery_override.enabled is False:
323 override_disable_deliveries.append(delivery)
325 # if self.delivery_selection != DELIVERY_SELECTION_FIXED:
326 # scenario_disable_deliveries = [
327 # d.name
328 # for d in self.context.delivery_registry.deliveries.values()
329 # if d.selection == [SELECTION_BY_SCENARIO]
330 # and d.name not in scenario_enable_deliveries
331 # and (d.name not in override_enable_deliveries or self.delivery_selection != DELIVERY_SELECTION_EXPLICIT)
332 # ]
333 all_global_enabled: list[str] = list(
334 set(scenario_enable_deliveries + default_enable_deliveries + override_enable_deliveries)
335 )
336 all_enabled: list[str] = all_global_enabled + recipients_enable_deliveries
337 # override_enable_deliveries takes precedence: if the action call explicitly
338 # re-enables a delivery that a scenario disabled, remove it from all_disabled.
339 all_disabled: list[str] = [
340 d for d in scenario_disable_deliveries + override_disable_deliveries if d not in override_enable_deliveries
341 ]
342 override_enabled: list[str] = list(set(scenario_enable_deliveries + override_enable_deliveries))
343 self.debug_trace.record_delivery_selection("override_disable_deliveries", override_disable_deliveries)
344 self.debug_trace.record_delivery_selection("override_enable_deliveries", override_enable_deliveries)
346 unsorted_maybe_objs: list[Delivery | None] = [
347 self.delivery_registry.deliveries.get(d) for d in all_enabled if d not in all_disabled
348 ]
349 unsorted_objs: list[Delivery] = [
350 d for d in unsorted_maybe_objs if d is not None and (d.enabled or d.name in override_enabled)
351 ]
352 first: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.FIRST]
353 anywhere: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.ANY]
354 last: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.LAST]
355 selected = first + anywhere + last
356 self.debug_trace.record_delivery_selection("ranked", selected)
358 # TODO: clean up this ugly logic, reorganize delivery around people
359 results: dict[str, dict[str, Any]] = {d: {} for d in selected}
360 personal_deliveries = [d for d in selected if d not in all_global_enabled and d in recipients_enable_deliveries]
361 for personal_delivery in personal_deliveries:
362 results[personal_delivery].setdefault("recipients", [])
363 for recipient in self.all_recipients():
364 if personal_delivery in recipient.enabling_delivery_names():
365 results[personal_delivery]["recipients"].append(recipient.entity_id)
366 return results
368 def suppress(self, reason: SuppressionReason) -> None:
369 self._suppression_reason = reason
370 if reason not in self._skip_reasons:
371 self._skip_reasons.append(reason)
372 _LOGGER.info(f"SUPERNOTIFY Suppressing notification, reason:{reason}, id:{self.id}")
374 async def deliver(self) -> bool:
375 _LOGGER.debug(
376 "Message: %s, notification: %s, deliveries: %s",
377 self.message,
378 self.id,
379 self.selected_deliveries,
380 )
382 for delivery_name, details in self.selected_deliveries.items():
383 self.deliveries[delivery_name] = {}
384 delivery = self.context.delivery_registry.deliveries.get(delivery_name)
385 if self._suppression_reason is not None:
386 _LOGGER.info("SUPERNOTIFY Suppressing globally silenced/snoozed notification (%s)", self.id)
387 self.record_result(delivery, suppression_reason=SuppressionReason.SNOOZED)
388 elif delivery:
389 await self.call_transport(delivery, recipients=details.get("recipients"))
390 else:
391 _LOGGER.error(f"SUPERNOTIFY Unexpected missing delivery {delivery_name}")
393 if self.delivered == 0 and not self._suppression_reason:
394 if self.failed == 0 and not self.dupe:
395 for delivery in self.context.delivery_registry.fallback_by_default_deliveries:
396 _LOGGER.info(
397 "SUPERNOTIFY no delivery succeeded, activating fallback_by_default: %s",
398 delivery.name,
399 )
400 if delivery.name not in self.selected_deliveries:
401 await self.call_transport(delivery)
402 self.fallback += 1
404 if self.failed > 0:
405 for delivery in self.context.delivery_registry.fallback_on_error_deliveries:
406 _LOGGER.warning(
407 "SUPERNOTIFY delivery failed, activating fallback_on_error: %s",
408 delivery.name,
409 )
410 if delivery.name not in self.selected_deliveries:
411 await self.call_transport(delivery)
412 self.fallback += 1
414 return self.delivered > 0
416 async def call_transport(self, delivery: Delivery, recipients: list[str] | None = None) -> None:
417 try:
418 transport: Transport = delivery.transport
419 if not transport.enabled:
420 self.record_result(delivery, suppression_reason=SuppressionReason.TRANSPORT_DISABLED)
421 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on transport disabled", delivery)
422 return
424 delivery_priorities: list[str] = delivery.priority
425 if self.priority and delivery_priorities and self.priority not in delivery_priorities:
426 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on priority (%s)", delivery, self.priority)
427 self.record_result(delivery, suppression_reason=SuppressionReason.PRIORITY)
428 return
429 if not delivery.evaluate_conditions(self.condition_variables):
430 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on conditions", delivery)
431 self.record_result(delivery, suppression_reason=SuppressionReason.DELIVERY_CONDITION)
432 return
434 targets: list[Target] = self.generate_targets(delivery, recipients=recipients)
435 envelopes: list[Envelope] = self.generate_envelopes(delivery, targets)
436 if not envelopes:
437 if delivery.target_required == TargetRequired.ALWAYS and (
438 not targets or not any(t.has_resolved_target() for t in targets)
439 ):
440 reason: SuppressionReason = SuppressionReason.NO_TARGET
441 else:
442 reason = SuppressionReason.UNKNOWN
443 self.record_result(delivery, targets=targets, suppression_reason=reason)
445 for envelope in envelopes:
446 if not self.force_resend and self.context.dupe_checker.check(envelope):
447 _LOGGER.debug("SUPERNOTIFY Suppressing dupe envelope, %s", self.message)
448 self.record_result(delivery, envelope, suppression_reason=SuppressionReason.DUPE)
449 continue
450 try:
451 if not await transport.deliver(envelope, debug_trace=self.debug_trace):
452 _LOGGER.info(
453 "SUPERNOTIFY No delivery for %s (targets: %s)",
454 delivery.name,
455 envelope.target.as_dict() if envelope.target else "NONE",
456 )
457 self.record_result(delivery, envelope)
458 except Exception as e2:
459 _LOGGER.exception("SUPERNOTIFY Failed to deliver %s: %s", delivery.name, e2)
460 envelope.error_count = envelope.error_count + 1
461 transport.record_error(str(e2), method="deliver")
462 envelope.delivery_error = format_exception(e2)
463 self.record_result(delivery, envelope)
465 except Exception as e:
466 _LOGGER.exception(
467 "SUPERNOTIFY Failed to notify using delivery %s via %s",
468 delivery.name,
469 type(delivery.transport).__name__,
470 )
471 self.deliveries.setdefault(delivery.name, {})
472 self.deliveries[delivery.name].setdefault("errors", [])
473 errors: list[str] = cast("list[str]", self.deliveries[delivery.name]["errors"])
474 errors.append("\n".join(format_exception(e)))
476 def record_result(
477 self,
478 delivery: Delivery | None,
479 envelope: Envelope | None = None,
480 targets: list[Target] | None = None,
481 suppression_reason: SuppressionReason | None = None,
482 ) -> None:
483 """Debugging (and unit test) support for notifications that failed or were skipped"""
484 if delivery:
485 if envelope:
486 self.delivered += envelope.delivered
487 self.error_count += envelope.error_count
488 self.deliveries.setdefault(delivery.name, {})
489 if envelope.delivered:
490 self.deliveries[delivery.name].setdefault(KEY_DELIVERED, [])
491 self.deliveries[delivery.name][KEY_DELIVERED].append(envelope) # type: ignore
492 else:
493 if suppression_reason:
494 envelope.skip_reason = suppression_reason
495 if suppression_reason not in self._skip_reasons:
496 self._skip_reasons.append(suppression_reason)
497 if suppression_reason == SuppressionReason.DUPE:
498 self.dupe = True
499 if envelope.error_count:
500 self.deliveries[delivery.name].setdefault(KEY_FAILED, [])
501 self.deliveries[delivery.name][KEY_FAILED].append(envelope) # type: ignore
502 self.failed += 1
503 else:
504 self.deliveries[delivery.name].setdefault(KEY_SUPPRESSED, [])
505 self.deliveries[delivery.name][KEY_SUPPRESSED].append(envelope) # type: ignore
506 self.suppressed += 1
508 if not envelope:
509 delivery_name: str = delivery.name if delivery else "!UNKNOWN!"
510 skip_summary: dict[str, Any] = {
511 "target_required": delivery.target_required if delivery else "!UNKNOWN!",
512 "suppression_reason": str(suppression_reason),
513 }
514 self.deliveries.setdefault(delivery_name, {})
515 if targets:
516 skip_summary["targets"] = targets
517 self.deliveries[delivery_name][KEY_SKIPPED] = skip_summary
518 self.skipped += 1
520 def contents(self, diagnostics: bool = False, **_kwargs: Any) -> dict[str, Any]:
521 """ArchiveableObject implementation"""
522 minimal = not diagnostics
523 object_refs = ["context", "people_registry", "delivery_registry"]
524 keys_only = ["enabled_scenarios"]
525 debug_only = ["debug_trace"]
526 exposed_if_populated = ["_delivery_error", "message_html", "extra_data", "actions", "_suppression_reason"]
527 # fine tune dict order to ease the eye-burden when reviewing archived notifications
528 preferred_order = [
529 "id",
530 "created",
531 "message",
532 "applied_scenario_names",
533 "constrain_scenario_names",
534 "required_scenario_names",
535 "enabled_scenarios",
536 "selected_scenario_names",
537 "delivery_selection",
538 "delivery_overrides",
539 "delivery_selection",
540 "selected_deliveries",
541 "recipients_override",
542 "delivered",
543 "failed",
544 "suppressed",
545 "skipped",
546 "error_count",
547 "deliveries",
548 ]
549 # preferred fields
550 result = {
551 k: sanitize(
552 self.__dict__[k], minimal=minimal, occupancy_only=True, top_level_keys_only=(minimal and k in keys_only)
553 )
554 for k in preferred_order
555 }
556 # all the rest not explicitly excluded
557 result.update({
558 k: sanitize(v, minimal=minimal, occupancy_only=True)
559 for k, v in self.__dict__.items()
560 if k not in result
561 and k not in exposed_if_populated
562 and k not in object_refs
563 and not k.startswith("_")
564 and (not minimal or k not in keys_only)
565 and (not minimal or k not in debug_only)
566 })
567 # the exposed only if populated fields
568 result.update({
569 k: sanitize(self.__dict__[k], minimal=minimal, occupancy_only=True)
570 for k in exposed_if_populated
571 if self.__dict__.get(k)
572 })
573 # delivery_stats: aggregate delivery metrics
574 try:
575 all_durations: dict[str, float] = {}
576 total_ok = 0
577 total_all = 0
578 for d_name, outcomes in self.deliveries.items():
579 for envelope in outcomes.get(KEY_DELIVERED, []):
580 dur = sum(c.contents().get("elapsed", 0) for c in getattr(envelope, "calls", [])) * 1000
581 all_durations[d_name] = dur
582 total_ok += 1
583 total_all += 1
584 for _envelope in outcomes.get(KEY_FAILED, []):
585 all_durations.setdefault(d_name, 0)
586 total_all += 1
587 if outcomes.get(KEY_SKIPPED):
588 total_all += 1
589 if all_durations:
590 result["stats"] = {
591 "total_duration_ms": round(sum(all_durations.values()), 1),
592 "slowest_delivery": max(all_durations, key=lambda k: all_durations[k]),
593 "fastest_delivery": min(all_durations, key=lambda k: all_durations[k]),
594 "delivery_success_rate": round(total_ok / total_all, 2) if total_all else 1.0,
595 }
596 except Exception as e:
597 _LOGGER.warning("SUPERNOTIFY delivery_stats computation failed: %s", e)
598 return result
600 def base_filename(self) -> str:
601 """ArchiveableObject implementation"""
602 return f"{self.created.isoformat()[:16].replace(':', '-')}_{self.id}"
604 def delivery_data(self, delivery: Delivery) -> dict[str, Any]:
605 if delivery is None:
606 return {}
607 delivery_override: DeliveryCustomization | None = self.delivery_overrides.get(delivery.name)
608 if delivery_override is None:
609 delivery_override = self.delivery_overrides.get(delivery.transport.name)
610 return delivery_override.data if delivery_override and delivery_override.data else {}
612 @property
613 def delivered_envelopes(self) -> list[Envelope]:
614 result: list[Envelope] = []
615 for delivery_result in self.deliveries.values():
616 result.extend(cast("list[Envelope]", delivery_result.get(KEY_DELIVERED, [])))
617 return result
619 @property
620 def undelivered_envelopes(self) -> list[Envelope]:
621 result: list[Envelope] = []
622 for delivery_result in self.deliveries.values():
623 result.extend(cast("list[Envelope]", delivery_result.get(KEY_SUPPRESSED, [])))
624 result.extend(cast("list[Envelope]", delivery_result.get(KEY_FAILED, [])))
625 return result
627 async def select_scenarios(self) -> list[str]:
628 return [s.name for s in self.context.scenario_registry.scenarios.values() if s.evaluate(self.condition_variables)]
630 def generate_targets(self, delivery: Delivery, recipients: list[str] | None = None) -> list[Target]:
632 if delivery.target_required == TargetRequired.NEVER:
633 # don't waste time computing targets for deliveries that don't need them
634 return [Target(None, target_data=delivery.data)]
636 computed_target: Target
638 if delivery.target_usage == TARGET_USE_FIXED:
639 if delivery.target:
640 computed_target = delivery.target.safe_copy()
641 self.debug_trace.record_target(delivery.name, "100_delivery_default_fixed", computed_target)
642 else:
643 computed_target = Target(None, target_data=delivery.data)
644 self.debug_trace.record_target(delivery.name, "101_delivery_default_fixed_empty", computed_target)
645 elif recipients is not None:
646 computed_target = Target(recipients)
647 self.debug_trace.record_target(delivery.name, "102_delivery_default_fixed", computed_target)
649 elif not self._target:
650 # Unless there are explicit targets, include everyone on the people registry
651 computed_target = self.default_person_ids(delivery)
652 self.debug_trace.record_target(delivery.name, "201_no_action_target", computed_target)
653 else:
654 computed_target = self._target.safe_copy()
655 self.debug_trace.record_target(delivery.name, "202_action_target", computed_target)
657 # 1st round of filtering for snooze and resolving people->direct targets
658 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery)
659 self.debug_trace.record_target(delivery.name, "300_post_snooze", computed_target)
660 # turn person_ids into emails and phone numbers
661 for indirect_target in self.resolve_indirect_targets(computed_target, delivery):
662 computed_target += indirect_target
663 self.debug_trace.record_target(delivery.name, "310_resolve_indirect", computed_target)
664 computed_target += self.resolve_scenario_targets(delivery)
665 self.debug_trace.record_target(delivery.name, "320_resolved_scenario_targets", computed_target)
666 # filter out target not required for this delivery
667 computed_target = delivery.select_targets(computed_target)
668 self.debug_trace.record_target(delivery.name, "330_delivery_selection", computed_target)
669 primary_count = len(computed_target)
671 if delivery.target_usage == TARGET_USE_ON_NO_DELIVERY_TARGETS:
672 if not computed_target.has_targets() and delivery.target:
673 computed_target += delivery.target
674 self.debug_trace.record_target(delivery.name, "400_delivery_default_no_delivery_targets", computed_target)
675 elif delivery.target_usage == TARGET_USE_ON_NO_ACTION_TARGETS:
676 if not self._target and delivery.target:
677 computed_target += delivery.target
678 self.debug_trace.record_target(delivery.name, "401_delivery_default_no_action_targets", computed_target)
679 elif delivery.target_usage == TARGET_USE_MERGE_ON_DELIVERY_TARGETS:
680 # merge in the delivery defaults if there's a target defined in action call
681 if computed_target.has_targets() and delivery.target:
682 computed_target += delivery.target
683 self.debug_trace.record_target(delivery.name, "402_delivery_merge_on_delivery_targets", computed_target)
684 elif delivery.target_usage == TARGET_USE_MERGE_ALWAYS:
685 # merge in the delivery defaults even if there's not a target defined in action call
686 if delivery.target:
687 computed_target += delivery.target
688 self.debug_trace.record_target(delivery.name, "403_delivery_merge_always_targets", computed_target)
689 elif delivery.target_usage == TARGET_USE_FIXED:
690 _LOGGER.debug("SUPERNOTIFY Fixed target on delivery %s", delivery.name)
691 self.debug_trace.record_target(delivery.name, "404_fixed_target", computed_target)
692 else:
693 self.debug_trace.record_target(delivery.name, "405_no_target_usage_match", computed_target)
694 _LOGGER.debug("SUPERNOTIFY No useful target definition for delivery %s", delivery.name)
696 if len(computed_target) > primary_count:
697 _LOGGER.debug(
698 "SUPERNOTIFY Delivery config added %s targets for %s", len(computed_target) - primary_count, delivery.name
699 )
701 # 2nd round of filtering for snooze and resolving people->direct targets after delivery target applied
702 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery)
703 self.debug_trace.record_target(delivery.name, "501_post_snooze", computed_target)
704 for indirect_target in self.resolve_indirect_targets(computed_target, delivery):
705 computed_target += indirect_target
706 self.debug_trace.record_target(delivery.name, "502_resolved_indirect_targets", computed_target)
707 computed_target += self.resolve_scenario_targets(delivery)
708 self.debug_trace.record_target(delivery.name, "503_resolved_scenario_targets", computed_target)
709 computed_target = delivery.select_targets(computed_target)
710 self.debug_trace.record_target(delivery.name, "504_delivery_selection", computed_target)
712 # If the action call explicitly specified a target for this delivery, it takes
713 # precedence over all resolved/merged targets above.
714 delivery_override: DeliveryCustomization | None = self.delivery_overrides.get(delivery.name)
715 if delivery_override is None:
716 # if override doesn't use a valid delivery name, try a transport name instead
717 delivery_override = self.delivery_overrides.get(delivery.transport.name)
718 if delivery_override and delivery_override.target and delivery_override.target.has_targets():
719 override_target = delivery_override.target
720 # handle and resolve indirect targets, like person->mobile device or email
721 for indirect_target in self.resolve_indirect_targets(override_target, delivery):
722 override_target += indirect_target
723 computed_target = delivery.select_targets(override_target)
724 self.debug_trace.record_target(delivery.name, "600_delivery_override_target", computed_target)
726 split_targets: list[Target] = computed_target.split_by_target_data()
727 self.debug_trace.record_target(delivery.name, "610_delivery_split_targets", split_targets)
729 direct_targets: list[Target] = [t.direct() for t in split_targets]
730 self.debug_trace.record_target(delivery.name, "620_narrow_to_direct", direct_targets)
732 if delivery.options.get(OPTION_UNIQUE_TARGETS, False):
733 direct_targets = [t - self._already_selected for t in direct_targets]
734 self.debug_trace.record_target(delivery.name, "630_make_unique_across_deliveries", direct_targets)
735 for direct_target in direct_targets:
736 self._already_selected += direct_target
737 self.debug_trace.record_target(delivery.name, "999_final_cut", direct_targets)
738 return direct_targets
740 def resolve_scenario_targets(self, delivery: Delivery) -> Target:
741 resolved: Target = Target()
742 for scenario in self.enabled_scenarios.values():
743 customization: DeliveryCustomization | None = scenario.delivery_customization(delivery.name)
744 if customization and customization.target and customization.target.has_targets():
745 resolved += customization.target
746 return resolved
748 def all_recipients(self) -> list[Recipient]:
749 recipients: list[Recipient] = []
750 if self._target:
751 # explicit targets given
752 recipients.extend(
753 self.people_registry.people[pers_ent_id]
754 for pers_ent_id in self._target.person_ids
755 if pers_ent_id in self.people_registry.people and self.people_registry.people[pers_ent_id].enabled
756 )
757 else:
758 # default to all known recipients
759 recipients = self.people_registry.enabled_recipients()
760 recipients = [r for r in recipients if self.recipients_override is None or r.entity_id in self.recipients_override]
761 return recipients
763 def default_person_ids(self, delivery: Delivery) -> Target:
764 # If target not specified on service call or delivery, then default to std list of recipients
765 people: list[Recipient] = self.people_registry.filter_recipients_by_occupancy(delivery.occupancy)
766 people = [p for p in people if self.recipients_override is None or p.entity_id in self.recipients_override]
767 return Target({ATTR_PERSON_ID: [p.entity_id for p in people if p.entity_id]})
769 def resolve_indirect_targets(self, target: Target, delivery: Delivery) -> list[Target]:
770 # enrich data selected in configuration for this delivery, from direct target definition or attrs like email or phone
771 resolved: Target = Target()
772 additional: list[Target] = []
774 for person_id in target.person_ids:
775 recipient: Recipient | None = self.people_registry.people.get(person_id)
776 if recipient and recipient.enabled:
777 recipient_target = recipient.target(delivery.name)
778 if recipient_target.target_specific_data:
779 additional.append(recipient_target)
780 else:
781 resolved += recipient_target
782 else:
783 _LOGGER.debug("SUPERNOTIFY Skipping recipient %s with enabled switched off", person_id)
785 return [resolved, *additional]
787 def generate_envelopes(self, delivery: Delivery, targets: list[Target]) -> list[Envelope]:
788 # now the list of recipients determined, resolve this to target addresses or entities
790 envelopes: list[Envelope] = []
791 for target in targets:
792 # a target is always generated, even if there are no recipients
793 if target.has_resolved_target() or delivery.target_required != TargetRequired.ALWAYS:
794 envelope_data = {}
796 # least priority - delivery derived data
797 envelope_data.update(delivery.data)
798 # next least priority - target derived data
799 if target.target_data:
800 envelope_data.update(target.target_data)
802 # scenario applied at cross-delivery level in apply_enabled_scenarios
803 for scenario in self.enabled_scenarios.values():
804 customization: DeliveryCustomization | None = scenario.delivery_customization(delivery.name)
805 if customization and customization.data:
806 envelope_data.update(customization.data)
808 # apply data from action call last to prioritize it
809 envelope_data.update({
810 k: v for k, v in self.extra_data.items() if k not in INTERNAL_DATA_KEYS
811 }) # action call data
813 envelopes.append(Envelope(delivery, self, target, envelope_data, context=self.context))
815 return envelopes