Coverage for custom_components/supernotify/notification.py: 90%
499 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-03-28 19:39 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-03-28 19:39 +0000
1import datetime as dt
2import logging
3import uuid
4from traceback import format_exception
5from typing import TYPE_CHECKING, Any, cast
7import voluptuous as vol
8from homeassistant.components.notify.const import ATTR_DATA
9from voluptuous import humanize
11from custom_components.supernotify.schema import SelectionRank
13from .archive import ArchivableObject
14from .common import ensure_list, nullable_ensure_list, sanitize
15from .const import (
16 ATTR_ACTION_GROUPS,
17 ATTR_ACTIONS,
18 ATTR_DEBUG,
19 ATTR_DELIVERY,
20 ATTR_DELIVERY_SELECTION,
21 ATTR_FORCE_RESEND,
22 ATTR_MEDIA,
23 ATTR_MEDIA_CLIP_URL,
24 ATTR_MEDIA_SNAPSHOT_URL,
25 ATTR_MESSAGE_HTML,
26 ATTR_PERSON_ID,
27 ATTR_PRIORITY,
28 ATTR_RECIPIENTS,
29 ATTR_SCENARIOS_APPLY,
30 ATTR_SCENARIOS_CONSTRAIN,
31 ATTR_SCENARIOS_REQUIRE,
32 ATTR_SPOKEN_MESSAGE,
33 DELIVERY_SELECTION_EXPLICIT,
34 DELIVERY_SELECTION_FIXED,
35 DELIVERY_SELECTION_IMPLICIT,
36 OPTION_UNIQUE_TARGETS,
37 PRIORITY_MEDIUM,
38 PRIORITY_VALUES,
39 TARGET_USE_FIXED,
40 TARGET_USE_MERGE_ALWAYS,
41 TARGET_USE_MERGE_ON_DELIVERY_TARGETS,
42 TARGET_USE_ON_NO_ACTION_TARGETS,
43 TARGET_USE_ON_NO_DELIVERY_TARGETS,
44)
45from .envelope import Envelope
46from .model import (
47 ConditionVariables,
48 DebugTrace,
49 DeliveryCustomization,
50 SuppressionReason,
51 Target,
52 TargetRequired,
53)
54from .schema import ACTION_DATA_SCHEMA, STRICT_ACTION_DATA_SCHEMA, Outcome
56if TYPE_CHECKING:
57 from .context import Context
58 from .delivery import Delivery, DeliveryRegistry
59 from .people import PeopleRegistry, Recipient
60 from .scenario import Scenario
61 from .transport import (
62 Transport,
63 )
65_LOGGER = logging.getLogger(__name__)
67# Deliveries mapping keys for debug / archive
68KEY_DELIVERED = "delivered"
69KEY_SUPPRESSED = "suppressed"
70KEY_FAILED = "failed"
71KEY_SKIPPED = "skipped"
73# supernotify specific data items not to be passed to transports in data
74INTERNAL_DATA_KEYS = (ATTR_FORCE_RESEND, ATTR_SPOKEN_MESSAGE)
76type t_delivery_name = str
77type t_outcome = str
80class Notification(ArchivableObject):
81 def __init__(
82 self,
83 context: Context,
84 message: str | None = None,
85 title: str | None = None,
86 target: list[str] | str | None = None,
87 action_data: dict[str, Any] | None = None,
88 ) -> None:
89 self.created: dt.datetime = dt.datetime.now(tz=dt.UTC)
90 self.debug_trace: DebugTrace = DebugTrace(message=message, title=title, data=action_data, target=target)
91 self.message: str | None = message
92 self.context: Context = context
93 self.people_registry: PeopleRegistry = context.people_registry
94 self.delivery_registry: DeliveryRegistry = context.delivery_registry
95 action_data = action_data or {}
96 self._target: Target | None = Target(target) if target else None
97 self._already_selected: Target = Target()
98 self._title: str | None = title
99 self.id = str(uuid.uuid1())
100 self.delivered: int = 0
101 self.error_count: int = 0
102 self.skipped: int = 0
103 self.failed: int = 0
104 self.suppressed: int = 0
105 self.fallback: int = 0
106 self.dupe: bool = False
107 self.deliveries: dict[t_delivery_name, dict[t_outcome, list[str] | list[Envelope] | dict[str, Any]]] = {}
108 self._skip_reasons: list[SuppressionReason] = []
110 self.validate_action_data(action_data)
111 # for compatibility with other notify calls, pass thru surplus data to underlying delivery transports
112 self.extra_data: dict[str, Any] = {
113 k: v for k, v in action_data.items() if k not in STRICT_ACTION_DATA_SCHEMA(action_data)
114 }
116 action_data = {k: v for k, v in action_data.items() if k not in self.extra_data}
117 self.extra_data.update(action_data.get(ATTR_DATA, {})) # nested `data` could be supernotify or target service
119 self.priority: str = action_data.get(ATTR_PRIORITY, PRIORITY_MEDIUM)
120 self.message_html: str | None = action_data.get(ATTR_MESSAGE_HTML)
121 self.force_resend: bool = action_data.get(ATTR_FORCE_RESEND, False)
122 self.required_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_REQUIRE))
123 self.applied_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_APPLY))
124 self.constrain_scenario_names: list[str] = ensure_list(action_data.get(ATTR_SCENARIOS_CONSTRAIN))
125 self.delivery_selection: str | None = action_data.get(ATTR_DELIVERY_SELECTION)
126 self.delivery_overrides: dict[str, DeliveryCustomization] = {}
128 delivery_data = action_data.get(ATTR_DELIVERY)
129 if isinstance(delivery_data, list):
130 # a bare list of deliveries implies intent to restrict
131 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for list %s", delivery_data)
132 if self.delivery_selection is None:
133 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT
134 self.delivery_overrides = {k: DeliveryCustomization({}) for k in action_data.get(ATTR_DELIVERY, [])}
135 elif isinstance(delivery_data, str) and delivery_data:
136 # a bare list of deliveries implies intent to restrict
137 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as explicit for single %s", delivery_data)
138 if self.delivery_selection is None:
139 self.delivery_selection = DELIVERY_SELECTION_EXPLICIT
140 self.delivery_overrides = {delivery_data: DeliveryCustomization({})}
141 elif isinstance(delivery_data, dict):
142 # whereas a dict may be used to tune or restrict
143 if self.delivery_selection is None:
144 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT
145 _LOGGER.debug("SUPERNOTIFY defaulting delivery selection as implicit for mapping %s", delivery_data)
146 self.delivery_overrides = {k: DeliveryCustomization(v) for k, v in action_data.get(ATTR_DELIVERY, {}).items()}
147 elif delivery_data:
148 _LOGGER.warning("SUPERNOTIFY Unable to interpret delivery data %s", delivery_data)
149 if self.delivery_selection is None:
150 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT
151 else:
152 if self.delivery_selection is None:
153 self.delivery_selection = DELIVERY_SELECTION_IMPLICIT
155 self.action_groups: list[str] | None = nullable_ensure_list(action_data.get(ATTR_ACTION_GROUPS))
156 self.recipients_override: list[str] | None = nullable_ensure_list(action_data.get(ATTR_RECIPIENTS))
157 self.media: dict[str, Any] = action_data.get(ATTR_MEDIA) or {}
158 self.debug: bool = action_data.get(ATTR_DEBUG, False)
159 self.actions: list[dict[str, Any]] = ensure_list(action_data.get(ATTR_ACTIONS))
161 self.selected_deliveries: dict[str, dict[str, Any]] = {}
162 self.enabled_scenarios: dict[str, Scenario] = {}
163 self.selected_scenario_names: list[str] = []
164 self._suppression_reason: SuppressionReason | None = None
165 self._delivery_error: list[str] | None = None
166 self.condition_variables: ConditionVariables
168 async def initialize(self) -> None:
169 """Async post-construction initialization"""
170 self.occupancy: dict[str, list[Recipient]] = self.people_registry.determine_occupancy()
171 self.condition_variables = ConditionVariables(
172 self.applied_scenario_names,
173 self.required_scenario_names,
174 self.constrain_scenario_names,
175 self.priority,
176 self.occupancy,
177 self.message,
178 self._title,
179 self.extra_data,
180 ) # requires occupancy first
182 enabled_scenario_names: list[str] = list(self.applied_scenario_names) or []
183 self.selected_scenario_names = await self.select_scenarios()
184 enabled_scenario_names.extend(self.selected_scenario_names)
185 if self.constrain_scenario_names:
186 enabled_scenario_names = [
187 s for s in enabled_scenario_names if (s in self.constrain_scenario_names or s in self.applied_scenario_names)
188 ]
189 if self.required_scenario_names and not any(s in enabled_scenario_names for s in self.required_scenario_names):
190 _LOGGER.info("SUPERNOTIFY suppressing notification, no required scenarios enabled")
191 self.selected_deliveries = {}
192 self.suppress(SuppressionReason.NO_SCENARIO)
193 else:
194 for s in enabled_scenario_names:
195 scenario_obj = self.context.scenario_registry.scenarios.get(s)
196 if scenario_obj is not None:
197 self.enabled_scenarios[s] = scenario_obj
199 self.selected_deliveries = self.select_deliveries()
200 if self.context.snoozer.is_global_snooze(self.priority):
201 self.suppress(SuppressionReason.SNOOZED)
202 self.apply_enabled_scenarios()
204 if not self.media:
205 self.media = self.media_requirements(self.extra_data)
207 def outcome(self) -> Outcome:
208 if self.error_count > 0:
209 return Outcome.ERROR
210 if self.dupe:
211 return Outcome.DUPE
212 if not self.delivered:
213 return Outcome.NO_DELIVERY
214 if self.fallback:
215 return Outcome.FALLBACK_DELIVERY
216 if self.skipped:
217 return Outcome.PARTIAL_DELIVERY
218 return Outcome.SUCCESS
220 def media_requirements(self, data: dict[str, Any]) -> dict[str, Any]:
221 """If no media defined, look for iOS / Android actions that have media defined
223 Example is the Frigate blueprint, which generates `image`, `video` etc
224 in the `data` section, that can also be used for email attachments
225 """
226 media_dict = {}
227 if not data:
228 return {}
229 if data.get("image"):
230 media_dict[ATTR_MEDIA_SNAPSHOT_URL] = data.get("image")
231 if data.get("video"):
232 media_dict[ATTR_MEDIA_CLIP_URL] = data.get("video")
233 if data.get("attachment", {}).get("url"):
234 url = data["attachment"]["url"]
235 if url and url.endswith(".mp4") and not media_dict.get(ATTR_MEDIA_CLIP_URL):
236 media_dict[ATTR_MEDIA_CLIP_URL] = url
237 elif (
238 url
239 and (url.endswith(".jpg") or url.endswith(".jpeg") or url.endswith(".png"))
240 and not media_dict.get(ATTR_MEDIA_SNAPSHOT_URL)
241 ):
242 media_dict[ATTR_MEDIA_SNAPSHOT_URL] = url
243 return media_dict
245 def validate_action_data(self, action_data: dict[str, Any]) -> None:
246 if action_data.get(ATTR_PRIORITY):
247 if isinstance(action_data.get(ATTR_PRIORITY), (str, int, float)):
248 if action_data.get(ATTR_PRIORITY) not in PRIORITY_VALUES:
249 _LOGGER.info("SUPERNOTIFY custom priority %s", action_data.get(ATTR_PRIORITY))
250 else:
251 _LOGGER.info("SUPERNOTIFY Invalid priority %s", action_data.get(ATTR_PRIORITY))
252 self.suppress(SuppressionReason.INVALID_ACTION_DATA)
253 raise vol.Invalid("Priority value must be a simple value")
254 try:
255 humanize.validate_with_humanized_errors(action_data, ACTION_DATA_SCHEMA)
256 except vol.Invalid as e:
257 _LOGGER.warning("SUPERNOTIFY invalid action data %s: %s", action_data, e)
258 self.suppress(SuppressionReason.INVALID_ACTION_DATA)
259 raise
260 except vol.error.Error as e2:
261 _LOGGER.warning("SUPERNOTIFY failed to validate action data %s: %s", action_data, e2)
262 self.suppress(SuppressionReason.INVALID_ACTION_DATA)
263 raise vol.Invalid(f"Unable to validate action data - {e2}") from e2
265 def apply_enabled_scenarios(self) -> None:
266 """Set media and action_groups from scenario if defined, first come first applied"""
267 action_groups: list[str] = []
268 for scenario in self.enabled_scenarios.values():
269 if scenario.media:
270 if self.media:
271 self.media.update(scenario.media)
272 else:
273 self.media = scenario.media
274 if scenario.action_groups:
275 action_groups.extend(ag for ag in scenario.action_groups if ag not in action_groups)
276 # self.action_groups only accessed from inside Envelope
277 if self.action_groups:
278 self.action_groups.extend(action_groups)
279 else:
280 self.action_groups = action_groups
282 def select_deliveries(self) -> dict[str, dict[str, Any]]:
283 scenario_enable_deliveries: list[str] = []
284 scenario_disable_deliveries: list[str] = []
285 default_enable_deliveries: list[str] = []
286 recipients_enable_deliveries: list[str] = []
288 if self.delivery_selection != DELIVERY_SELECTION_FIXED:
289 for scenario in self.enabled_scenarios.values():
290 scenario_enable_deliveries.extend(scenario.enabling_deliveries())
291 for scenario in self.enabled_scenarios.values():
292 scenario_disable_deliveries.extend(scenario.disabling_deliveries())
294 scenario_enable_deliveries = list(set(scenario_enable_deliveries))
295 scenario_disable_deliveries = list(set(scenario_disable_deliveries))
297 for recipient in self.all_recipients():
298 recipients_enable_deliveries.extend(recipient.enabling_delivery_names())
299 if self.delivery_selection == DELIVERY_SELECTION_IMPLICIT:
300 # all deliveries with SELECTION_DEFAULT in CONF_SELECTION
301 default_enable_deliveries = [d.name for d in self.context.delivery_registry.implicit_deliveries]
303 self.debug_trace.record_delivery_selection("scenario_enable_deliveries", scenario_enable_deliveries)
304 self.debug_trace.record_delivery_selection("scenario_disable_deliveries", scenario_disable_deliveries)
305 self.debug_trace.record_delivery_selection("default_enable_deliveries", default_enable_deliveries)
306 self.debug_trace.record_delivery_selection("recipient_enable_deliveries", recipients_enable_deliveries)
308 override_enable_deliveries: list[str] = []
309 override_disable_deliveries: list[str] = []
311 # apply the deliveries defined in the notification action call
312 for delivery, delivery_override in self.delivery_overrides.items():
313 if (
314 (delivery_override is None or delivery_override.enabled is True)
315 and delivery in self.context.delivery_registry.enabled_deliveries
316 ) or (
317 (delivery_override is not None and delivery_override.enabled is True)
318 and delivery in self.context.delivery_registry.disabled_deliveries
319 ):
320 override_enable_deliveries.append(delivery)
321 elif delivery_override is not None and delivery_override.enabled is False:
322 override_disable_deliveries.append(delivery)
324 # if self.delivery_selection != DELIVERY_SELECTION_FIXED:
325 # scenario_disable_deliveries = [
326 # d.name
327 # for d in self.context.delivery_registry.deliveries.values()
328 # if d.selection == [SELECTION_BY_SCENARIO]
329 # and d.name not in scenario_enable_deliveries
330 # and (d.name not in override_enable_deliveries or self.delivery_selection != DELIVERY_SELECTION_EXPLICIT)
331 # ]
332 all_global_enabled: list[str] = list(
333 set(scenario_enable_deliveries + default_enable_deliveries + override_enable_deliveries)
334 )
335 all_enabled: list[str] = all_global_enabled + recipients_enable_deliveries
336 # override_enable_deliveries takes precedence: if the action call explicitly
337 # re-enables a delivery that a scenario disabled, remove it from all_disabled.
338 all_disabled: list[str] = [
339 d for d in scenario_disable_deliveries + override_disable_deliveries if d not in override_enable_deliveries
340 ]
341 override_enabled: list[str] = list(set(scenario_enable_deliveries + override_enable_deliveries))
342 self.debug_trace.record_delivery_selection("override_disable_deliveries", override_disable_deliveries)
343 self.debug_trace.record_delivery_selection("override_enable_deliveries", override_enable_deliveries)
345 unsorted_maybe_objs: list[Delivery | None] = [
346 self.delivery_registry.deliveries.get(d) for d in all_enabled if d not in all_disabled
347 ]
348 unsorted_objs: list[Delivery] = [
349 d for d in unsorted_maybe_objs if d is not None and (d.enabled or d.name in override_enabled)
350 ]
351 first: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.FIRST]
352 anywhere: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.ANY]
353 last: list[str] = [d.name for d in unsorted_objs if d.selection_rank == SelectionRank.LAST]
354 selected = first + anywhere + last
355 self.debug_trace.record_delivery_selection("ranked", selected)
357 # TODO: clean up this ugly logic, reorganize delivery around people
358 results: dict[str, dict[str, Any]] = {d: {} for d in selected}
359 personal_deliveries = [d for d in selected if d not in all_global_enabled and d in recipients_enable_deliveries]
360 for personal_delivery in personal_deliveries:
361 results[personal_delivery].setdefault("recipients", [])
362 for recipient in self.all_recipients():
363 if personal_delivery in recipient.enabling_delivery_names():
364 results[personal_delivery]["recipients"].append(recipient.entity_id)
365 return results
367 def suppress(self, reason: SuppressionReason) -> None:
368 self._suppression_reason = reason
369 if reason not in self._skip_reasons:
370 self._skip_reasons.append(reason)
371 _LOGGER.info(f"SUPERNOTIFY Suppressing notification, reason:{reason}, id:{self.id}")
373 async def deliver(self) -> bool:
374 _LOGGER.debug(
375 "Message: %s, notification: %s, deliveries: %s",
376 self.message,
377 self.id,
378 self.selected_deliveries,
379 )
381 for delivery_name, details in self.selected_deliveries.items():
382 self.deliveries[delivery_name] = {}
383 delivery = self.context.delivery_registry.deliveries.get(delivery_name)
384 if self._suppression_reason is not None:
385 _LOGGER.info("SUPERNOTIFY Suppressing globally silenced/snoozed notification (%s)", self.id)
386 self.record_result(delivery, suppression_reason=SuppressionReason.SNOOZED)
387 elif delivery:
388 await self.call_transport(delivery, recipients=details.get("recipients"))
389 else:
390 _LOGGER.error(f"SUPERNOTIFY Unexpected missing delivery {delivery_name}")
392 if self.delivered == 0 and not self._suppression_reason:
393 if self.failed == 0 and not self.dupe:
394 for delivery in self.context.delivery_registry.fallback_by_default_deliveries:
395 _LOGGER.info(
396 "SUPERNOTIFY no delivery succeeded, activating fallback_by_default: %s",
397 delivery.name,
398 )
399 if delivery.name not in self.selected_deliveries:
400 await self.call_transport(delivery)
401 self.fallback += 1
403 if self.failed > 0:
404 for delivery in self.context.delivery_registry.fallback_on_error_deliveries:
405 _LOGGER.warning(
406 "SUPERNOTIFY delivery failed, activating fallback_on_error: %s",
407 delivery.name,
408 )
409 if delivery.name not in self.selected_deliveries:
410 await self.call_transport(delivery)
411 self.fallback += 1
413 return self.delivered > 0
415 async def call_transport(self, delivery: Delivery, recipients: list[str] | None = None) -> None:
416 try:
417 transport: Transport = delivery.transport
418 if not transport.enabled:
419 self.record_result(delivery, suppression_reason=SuppressionReason.TRANSPORT_DISABLED)
420 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on transport disabled", delivery)
421 return
423 delivery_priorities: list[str] = delivery.priority
424 if self.priority and delivery_priorities and self.priority not in delivery_priorities:
425 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on priority (%s)", delivery, self.priority)
426 self.record_result(delivery, suppression_reason=SuppressionReason.PRIORITY)
427 return
428 if not delivery.evaluate_conditions(self.condition_variables):
429 _LOGGER.debug("SUPERNOTIFY Skipping delivery %s based on conditions", delivery)
430 self.record_result(delivery, suppression_reason=SuppressionReason.DELIVERY_CONDITION)
431 return
433 targets: list[Target] = self.generate_targets(delivery, recipients=recipients)
434 envelopes: list[Envelope] = self.generate_envelopes(delivery, targets)
435 if not envelopes:
436 if delivery.target_required == TargetRequired.ALWAYS and (
437 not targets or not any(t.has_resolved_target() for t in targets)
438 ):
439 reason: SuppressionReason = SuppressionReason.NO_TARGET
440 else:
441 reason = SuppressionReason.UNKNOWN
442 self.record_result(delivery, targets=targets, suppression_reason=reason)
444 for envelope in envelopes:
445 if not self.force_resend and self.context.dupe_checker.check(envelope):
446 _LOGGER.debug("SUPERNOTIFY Suppressing dupe envelope, %s", self.message)
447 self.record_result(delivery, envelope, suppression_reason=SuppressionReason.DUPE)
448 continue
449 try:
450 if not await transport.deliver(envelope, debug_trace=self.debug_trace):
451 _LOGGER.info(
452 "SUPERNOTIFY No delivery for %s (targets: %s)",
453 delivery.name,
454 envelope.target.as_dict() if envelope.target else "NONE",
455 )
456 self.record_result(delivery, envelope)
457 except Exception as e2:
458 _LOGGER.exception("SUPERNOTIFY Failed to deliver %s: %s", delivery.name, e2)
459 envelope.error_count = envelope.error_count + 1
460 transport.record_error(str(e2), method="deliver")
461 envelope.delivery_error = format_exception(e2)
462 self.record_result(delivery, envelope)
464 except Exception as e:
465 _LOGGER.exception(
466 "SUPERNOTIFY Failed to notify using delivery %s via %s",
467 delivery.name,
468 type(delivery.transport).__name__,
469 )
470 self.deliveries.setdefault(delivery.name, {})
471 self.deliveries[delivery.name].setdefault("errors", [])
472 errors: list[str] = cast("list[str]", self.deliveries[delivery.name]["errors"])
473 errors.append("\n".join(format_exception(e)))
475 def record_result(
476 self,
477 delivery: Delivery | None,
478 envelope: Envelope | None = None,
479 targets: list[Target] | None = None,
480 suppression_reason: SuppressionReason | None = None,
481 ) -> None:
482 """Debugging (and unit test) support for notifications that failed or were skipped"""
483 if delivery:
484 if envelope:
485 self.delivered += envelope.delivered
486 self.error_count += envelope.error_count
487 self.deliveries.setdefault(delivery.name, {})
488 if envelope.delivered:
489 self.deliveries[delivery.name].setdefault(KEY_DELIVERED, [])
490 self.deliveries[delivery.name][KEY_DELIVERED].append(envelope) # type: ignore
491 else:
492 if suppression_reason:
493 envelope.skip_reason = suppression_reason
494 if suppression_reason not in self._skip_reasons:
495 self._skip_reasons.append(suppression_reason)
496 if suppression_reason == SuppressionReason.DUPE:
497 self.dupe = True
498 if envelope.error_count:
499 self.deliveries[delivery.name].setdefault(KEY_FAILED, [])
500 self.deliveries[delivery.name][KEY_FAILED].append(envelope) # type: ignore
501 self.failed += 1
502 else:
503 self.deliveries[delivery.name].setdefault(KEY_SUPPRESSED, [])
504 self.deliveries[delivery.name][KEY_SUPPRESSED].append(envelope) # type: ignore
505 self.suppressed += 1
507 if not envelope:
508 delivery_name: str = delivery.name if delivery else "!UNKNOWN!"
509 skip_summary: dict[str, Any] = {
510 "target_required": delivery.target_required if delivery else "!UNKNOWN!",
511 "suppression_reason": str(suppression_reason),
512 }
513 self.deliveries.setdefault(delivery_name, {})
514 if targets:
515 skip_summary["targets"] = targets
516 self.deliveries[delivery_name][KEY_SKIPPED] = skip_summary
517 self.skipped += 1
519 def contents(self, diagnostics: bool = False, **_kwargs: Any) -> dict[str, Any]:
520 """ArchiveableObject implementation"""
521 minimal = not diagnostics
522 object_refs = ["context", "people_registry", "delivery_registry"]
523 keys_only = ["enabled_scenarios"]
524 debug_only = ["debug_trace"]
525 exposed_if_populated = ["_delivery_error", "message_html", "extra_data", "actions", "_suppression_reason"]
526 # fine tune dict order to ease the eye-burden when reviewing archived notifications
527 preferred_order = [
528 "id",
529 "created",
530 "message",
531 "applied_scenario_names",
532 "constrain_scenario_names",
533 "required_scenario_names",
534 "enabled_scenarios",
535 "selected_scenario_names",
536 "delivery_selection",
537 "delivery_overrides",
538 "delivery_selection",
539 "selected_deliveries",
540 "recipients_override",
541 "delivered",
542 "failed",
543 "suppressed",
544 "skipped",
545 "error_count",
546 "deliveries",
547 ]
548 # preferred fields
549 result = {
550 k: sanitize(
551 self.__dict__[k], minimal=minimal, occupancy_only=True, top_level_keys_only=(minimal and k in keys_only)
552 )
553 for k in preferred_order
554 }
555 # all the rest not explicitly excluded
556 result.update({
557 k: sanitize(v, minimal=minimal, occupancy_only=True)
558 for k, v in self.__dict__.items()
559 if k not in result
560 and k not in exposed_if_populated
561 and k not in object_refs
562 and not k.startswith("_")
563 and (not minimal or k not in keys_only)
564 and (not minimal or k not in debug_only)
565 })
566 # the exposed only if populated fields
567 result.update({
568 k: sanitize(self.__dict__[k], minimal=minimal, occupancy_only=True)
569 for k in exposed_if_populated
570 if self.__dict__.get(k)
571 })
572 # delivery_stats: aggregate delivery metrics
573 try:
574 all_durations: dict[str, float] = {}
575 total_ok = 0
576 total_all = 0
577 for d_name, outcomes in self.deliveries.items():
578 for envelope in outcomes.get(KEY_DELIVERED, []):
579 dur = sum(c.contents().get("elapsed", 0) for c in getattr(envelope, "calls", [])) * 1000
580 all_durations[d_name] = dur
581 total_ok += 1
582 total_all += 1
583 for _envelope in outcomes.get(KEY_FAILED, []):
584 all_durations.setdefault(d_name, 0)
585 total_all += 1
586 if outcomes.get(KEY_SKIPPED):
587 total_all += 1
588 if all_durations:
589 result["stats"] = {
590 "total_duration_ms": round(sum(all_durations.values()), 1),
591 "slowest_delivery": max(all_durations, key=lambda k: all_durations[k]),
592 "fastest_delivery": min(all_durations, key=lambda k: all_durations[k]),
593 "delivery_success_rate": round(total_ok / total_all, 2) if total_all else 1.0,
594 }
595 except Exception as e:
596 _LOGGER.warning("SUPERNOTIFY delivery_stats computation failed: %s", e)
597 return result
599 def base_filename(self) -> str:
600 """ArchiveableObject implementation"""
601 return f"{self.created.isoformat()[:16].replace(':', '-')}_{self.id}"
603 def delivery_data(self, delivery: Delivery) -> dict[str, Any]:
604 if delivery is None:
605 return {}
606 delivery_override: DeliveryCustomization | None = self.delivery_overrides.get(delivery.name)
607 if delivery_override is None:
608 delivery_override = self.delivery_overrides.get(delivery.transport.name)
609 return delivery_override.data if delivery_override and delivery_override.data else {}
611 @property
612 def delivered_envelopes(self) -> list[Envelope]:
613 result: list[Envelope] = []
614 for delivery_result in self.deliveries.values():
615 result.extend(cast("list[Envelope]", delivery_result.get(KEY_DELIVERED, [])))
616 return result
618 @property
619 def undelivered_envelopes(self) -> list[Envelope]:
620 result: list[Envelope] = []
621 for delivery_result in self.deliveries.values():
622 result.extend(cast("list[Envelope]", delivery_result.get(KEY_SUPPRESSED, [])))
623 result.extend(cast("list[Envelope]", delivery_result.get(KEY_FAILED, [])))
624 return result
626 async def select_scenarios(self) -> list[str]:
627 return [s.name for s in self.context.scenario_registry.scenarios.values() if s.evaluate(self.condition_variables)]
629 def generate_targets(self, delivery: Delivery, recipients: list[str] | None = None) -> list[Target]:
631 if delivery.target_required == TargetRequired.NEVER:
632 # don't waste time computing targets for deliveries that don't need them
633 return [Target(None, target_data=delivery.data)]
635 computed_target: Target
637 if delivery.target_usage == TARGET_USE_FIXED:
638 if delivery.target:
639 computed_target = delivery.target.safe_copy()
640 self.debug_trace.record_target(delivery.name, "100_delivery_default_fixed", computed_target)
641 else:
642 computed_target = Target(None, target_data=delivery.data)
643 self.debug_trace.record_target(delivery.name, "101_delivery_default_fixed_empty", computed_target)
644 elif recipients is not None:
645 computed_target = Target(recipients)
646 self.debug_trace.record_target(delivery.name, "102_delivery_default_fixed", computed_target)
648 elif not self._target:
649 # Unless there are explicit targets, include everyone on the people registry
650 computed_target = self.default_person_ids(delivery)
651 self.debug_trace.record_target(delivery.name, "201_no_action_target", computed_target)
652 else:
653 computed_target = self._target.safe_copy()
654 self.debug_trace.record_target(delivery.name, "202_action_target", computed_target)
656 # 1st round of filtering for snooze and resolving people->direct targets
657 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery)
658 self.debug_trace.record_target(delivery.name, "300_post_snooze", computed_target)
659 # turn person_ids into emails and phone numbers
660 for indirect_target in self.resolve_indirect_targets(computed_target, delivery):
661 computed_target += indirect_target
662 self.debug_trace.record_target(delivery.name, "310_resolve_indirect", computed_target)
663 computed_target += self.resolve_scenario_targets(delivery)
664 self.debug_trace.record_target(delivery.name, "320_resolved_scenario_targets", computed_target)
665 # filter out target not required for this delivery
666 computed_target = delivery.select_targets(computed_target)
667 self.debug_trace.record_target(delivery.name, "330_delivery_selection", computed_target)
668 primary_count = len(computed_target)
670 if delivery.target_usage == TARGET_USE_ON_NO_DELIVERY_TARGETS:
671 if not computed_target.has_targets() and delivery.target:
672 computed_target += delivery.target
673 self.debug_trace.record_target(delivery.name, "400_delivery_default_no_delivery_targets", computed_target)
674 elif delivery.target_usage == TARGET_USE_ON_NO_ACTION_TARGETS:
675 if not self._target and delivery.target:
676 computed_target += delivery.target
677 self.debug_trace.record_target(delivery.name, "401_delivery_default_no_action_targets", computed_target)
678 elif delivery.target_usage == TARGET_USE_MERGE_ON_DELIVERY_TARGETS:
679 # merge in the delivery defaults if there's a target defined in action call
680 if computed_target.has_targets() and delivery.target:
681 computed_target += delivery.target
682 self.debug_trace.record_target(delivery.name, "402_delivery_merge_on_delivery_targets", computed_target)
683 elif delivery.target_usage == TARGET_USE_MERGE_ALWAYS:
684 # merge in the delivery defaults even if there's not a target defined in action call
685 if delivery.target:
686 computed_target += delivery.target
687 self.debug_trace.record_target(delivery.name, "403_delivery_merge_always_targets", computed_target)
688 elif delivery.target_usage == TARGET_USE_FIXED:
689 _LOGGER.debug("SUPERNOTIFY Fixed target on delivery %s", delivery.name)
690 self.debug_trace.record_target(delivery.name, "404_fixed_target", computed_target)
691 else:
692 self.debug_trace.record_target(delivery.name, "405_no_target_usage_match", computed_target)
693 _LOGGER.debug("SUPERNOTIFY No useful target definition for delivery %s", delivery.name)
695 if len(computed_target) > primary_count:
696 _LOGGER.debug(
697 "SUPERNOTIFY Delivery config added %s targets for %s", len(computed_target) - primary_count, delivery.name
698 )
700 # 2nd round of filtering for snooze and resolving people->direct targets after delivery target applied
701 computed_target = self.context.snoozer.filter_recipients(computed_target, self.priority, delivery)
702 self.debug_trace.record_target(delivery.name, "501_post_snooze", computed_target)
703 for indirect_target in self.resolve_indirect_targets(computed_target, delivery):
704 computed_target += indirect_target
705 self.debug_trace.record_target(delivery.name, "502_resolved_indirect_targets", computed_target)
706 computed_target += self.resolve_scenario_targets(delivery)
707 self.debug_trace.record_target(delivery.name, "503_resolved_scenario_targets", computed_target)
708 computed_target = delivery.select_targets(computed_target)
709 self.debug_trace.record_target(delivery.name, "504_delivery_selection", computed_target)
711 # If the action call explicitly specified a target for this delivery, it takes
712 # precedence over all resolved/merged targets above.
713 delivery_override: DeliveryCustomization | None = self.delivery_overrides.get(delivery.name)
714 if delivery_override is None:
715 # if override doesn't use a valid delivery name, try a transport name instead
716 delivery_override = self.delivery_overrides.get(delivery.transport.name)
717 if delivery_override and delivery_override.target and delivery_override.target.has_targets():
718 override_target = delivery_override.target
719 # handle and resolve indirect targets, like person->mobile device or email
720 for indirect_target in self.resolve_indirect_targets(override_target, delivery):
721 override_target += indirect_target
722 computed_target = delivery.select_targets(override_target)
723 self.debug_trace.record_target(delivery.name, "600_delivery_override_target", computed_target)
725 split_targets: list[Target] = computed_target.split_by_target_data()
726 self.debug_trace.record_target(delivery.name, "610_delivery_split_targets", split_targets)
728 direct_targets: list[Target] = [t.direct() for t in split_targets]
729 self.debug_trace.record_target(delivery.name, "620_narrow_to_direct", direct_targets)
731 if delivery.options.get(OPTION_UNIQUE_TARGETS, False):
732 direct_targets = [t - self._already_selected for t in direct_targets]
733 self.debug_trace.record_target(delivery.name, "630_make_unique_across_deliveries", direct_targets)
734 for direct_target in direct_targets:
735 self._already_selected += direct_target
736 self.debug_trace.record_target(delivery.name, "999_final_cut", direct_targets)
737 return direct_targets
739 def resolve_scenario_targets(self, delivery: Delivery) -> Target:
740 resolved: Target = Target()
741 for scenario in self.enabled_scenarios.values():
742 customization: DeliveryCustomization | None = scenario.delivery_customization(delivery.name)
743 if customization and customization.target and customization.target.has_targets():
744 resolved += customization.target
745 return resolved
747 def all_recipients(self) -> list[Recipient]:
748 recipients: list[Recipient] = []
749 if self._target:
750 # explicit targets given
751 recipients.extend(
752 self.people_registry.people[pers_ent_id]
753 for pers_ent_id in self._target.person_ids
754 if pers_ent_id in self.people_registry.people and self.people_registry.people[pers_ent_id].enabled
755 )
756 else:
757 # default to all known recipients
758 recipients = self.people_registry.enabled_recipients()
759 recipients = [r for r in recipients if self.recipients_override is None or r.entity_id in self.recipients_override]
760 return recipients
762 def default_person_ids(self, delivery: Delivery) -> Target:
763 # If target not specified on service call or delivery, then default to std list of recipients
764 people: list[Recipient] = self.people_registry.filter_recipients_by_occupancy(delivery.occupancy)
765 people = [p for p in people if self.recipients_override is None or p.entity_id in self.recipients_override]
766 return Target({ATTR_PERSON_ID: [p.entity_id for p in people if p.entity_id]})
768 def resolve_indirect_targets(self, target: Target, delivery: Delivery) -> list[Target]:
769 # enrich data selected in configuration for this delivery, from direct target definition or attrs like email or phone
770 resolved: Target = Target()
771 additional: list[Target] = []
773 for person_id in target.person_ids:
774 recipient: Recipient | None = self.people_registry.people.get(person_id)
775 if recipient and recipient.enabled:
776 recipient_target = recipient.target(delivery.name)
777 if recipient_target.target_specific_data:
778 additional.append(recipient_target)
779 else:
780 resolved += recipient_target
781 else:
782 _LOGGER.debug("SUPERNOTIFY Skipping recipient %s with enabled switched off", person_id)
784 return [resolved, *additional]
786 def generate_envelopes(self, delivery: Delivery, targets: list[Target]) -> list[Envelope]:
787 # now the list of recipients determined, resolve this to target addresses or entities
789 envelopes: list[Envelope] = []
790 for target in targets:
791 # a target is always generated, even if there are no recipients
792 if target.has_resolved_target() or delivery.target_required != TargetRequired.ALWAYS:
793 envelope_data = {}
795 # least priority - delivery derived data
796 envelope_data.update(delivery.data)
797 # next least priority - target derived data
798 if target.target_data:
799 envelope_data.update(target.target_data)
801 # scenario applied at cross-delivery level in apply_enabled_scenarios
802 for scenario in self.enabled_scenarios.values():
803 customization: DeliveryCustomization | None = scenario.delivery_customization(delivery.name)
804 if customization and customization.data:
805 envelope_data.update(customization.data)
807 # apply data from action call last to prioritize it
808 envelope_data.update({
809 k: v for k, v in self.extra_data.items() if k not in INTERNAL_DATA_KEYS
810 }) # action call data
812 envelopes.append(Envelope(delivery, self, target, envelope_data, context=self.context))
814 return envelopes