Coverage for custom_components/supernotify/notify.py: 89%
309 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1"""Supernotify service, extending BaseNotificationService"""
3import json
4import logging
5from dataclasses import asdict
6from traceback import format_exception
7from typing import TYPE_CHECKING, Any
9from homeassistant.components.notify import (
10 NotifyEntity,
11 NotifyEntityFeature,
12)
13from homeassistant.components.notify.legacy import BaseNotificationService
14from homeassistant.const import EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON, STATE_UNKNOWN, EntityCategory, Platform
15from homeassistant.core import (
16 Event,
17 EventStateChangedData,
18 HomeAssistant,
19 ServiceCall,
20 State,
21 SupportsResponse,
22 callback,
23)
24from homeassistant.helpers.json import ExtendedJSONEncoder
25from homeassistant.helpers.reload import async_setup_reload_service
27from . import DOMAIN, PLATFORMS
28from .archive import ARCHIVE_PURGE_MIN_INTERVAL, NotificationArchive
29from .common import DupeChecker, sanitize
30from .const import (
31 ATTR_ACTION,
32 ATTR_DATA,
33 CONF_ACTION_GROUPS,
34 CONF_ACTIONS,
35 CONF_ARCHIVE,
36 CONF_CAMERAS,
37 CONF_DELIVERY,
38 CONF_DUPE_CHECK,
39 CONF_HOUSEKEEPING,
40 CONF_HOUSEKEEPING_TIME,
41 CONF_LINKS,
42 CONF_MEDIA_PATH,
43 CONF_MEDIA_STORAGE_DAYS,
44 CONF_MOBILE_DISCOVERY,
45 CONF_RECIPIENTS,
46 CONF_RECIPIENTS_DISCOVERY,
47 CONF_SCENARIOS,
48 CONF_SNOOZE,
49 CONF_TEMPLATE_PATH,
50 CONF_TRANSPORTS,
51 PRIORITY_MEDIUM,
52)
53from .context import Context
54from .delivery import DeliveryRegistry
55from .hass_api import HomeAssistantAPI
56from .media_grab import MediaStorage
57from .model import ConditionVariables, SuppressionReason
58from .notification import Notification
59from .people import PeopleRegistry, Recipient
60from .scenario import ScenarioRegistry
61from .schema import SUPERNOTIFY_SCHEMA as PLATFORM_SCHEMA
62from .snoozer import Snoozer
63from .transports.alexa_devices import AlexaDevicesTransport
64from .transports.alexa_media_player import AlexaMediaPlayerTransport
65from .transports.chime import ChimeTransport
66from .transports.email import EmailTransport
67from .transports.generic import GenericTransport
68from .transports.media_player import MediaPlayerTransport
69from .transports.mobile_push import MobilePushTransport
70from .transports.mqtt import MQTTTransport
71from .transports.notify_entity import NotifyEntityTransport
72from .transports.persistent import PersistentTransport
73from .transports.sms import SMSTransport
74from .transports.tts import TTSTransport
76if TYPE_CHECKING:
77 import datetime as dt
79 from homeassistant.helpers import entity_registry as er
80 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
82 from .scenario import Scenario
83 from .transport import Transport
85PARALLEL_UPDATES = 0
87_LOGGER = logging.getLogger(__name__)
89TRANSPORTS: list[type[Transport]] = [
90 EmailTransport,
91 SMSTransport,
92 MQTTTransport,
93 AlexaDevicesTransport,
94 AlexaMediaPlayerTransport,
95 MobilePushTransport,
96 MediaPlayerTransport,
97 ChimeTransport,
98 PersistentTransport,
99 GenericTransport,
100 TTSTransport,
101 NotifyEntityTransport,
102] # No auto-discovery of transport plugins so manual class registration required here
105async def async_get_service(
106 hass: HomeAssistant,
107 config: ConfigType,
108 discovery_info: DiscoveryInfoType | None = None,
109) -> SupernotifyAction:
110 """Notify specific component setup - see async_setup_legacy in legacy BaseNotificationService"""
111 _ = PLATFORM_SCHEMA # schema must be imported even if not used for HA platform detection
112 _ = discovery_info
113 # for delivery in config.get(CONF_DELIVERY, {}).values():
114 # if delivery and CONF_CONDITION in delivery:
115 # try:
116 # await async_validate_condition_config(hass, delivery[CONF_CONDITION])
117 # except Exception as e:
118 # _LOGGER.error("SUPERNOTIFY delivery %s fails condition: %s", delivery[CONF_CONDITION], e)
119 # raise
121 await async_setup_reload_service(hass, DOMAIN, PLATFORMS)
122 service = SupernotifyAction(
123 hass,
124 deliveries=config[CONF_DELIVERY],
125 template_path=config[CONF_TEMPLATE_PATH],
126 media_path=config[CONF_MEDIA_PATH],
127 archive=config[CONF_ARCHIVE],
128 housekeeping=config[CONF_HOUSEKEEPING],
129 mobile_discovery=config[CONF_MOBILE_DISCOVERY],
130 recipients_discovery=config[CONF_RECIPIENTS_DISCOVERY],
131 recipients=config[CONF_RECIPIENTS],
132 mobile_actions=config[CONF_ACTION_GROUPS],
133 scenarios=config[CONF_SCENARIOS],
134 links=config[CONF_LINKS],
135 transport_configs=config[CONF_TRANSPORTS],
136 cameras=config[CONF_CAMERAS],
137 dupe_check=config[CONF_DUPE_CHECK],
138 snooze=config[CONF_SNOOZE],
139 )
140 await service.initialize()
142 def supplemental_action_enquire_configuration(_call: ServiceCall) -> dict[str, Any]:
143 return {
144 CONF_DELIVERY: config.get(CONF_DELIVERY, {}),
145 CONF_LINKS: config.get(CONF_LINKS, ()),
146 CONF_TEMPLATE_PATH: config.get(CONF_TEMPLATE_PATH, None),
147 CONF_MEDIA_PATH: config.get(CONF_MEDIA_PATH, None),
148 CONF_ARCHIVE: config.get(CONF_ARCHIVE, {}),
149 CONF_MOBILE_DISCOVERY: config.get(CONF_MOBILE_DISCOVERY, ()),
150 CONF_RECIPIENTS_DISCOVERY: config.get(CONF_RECIPIENTS_DISCOVERY, ()),
151 CONF_RECIPIENTS: config.get(CONF_RECIPIENTS, ()),
152 CONF_ACTIONS: config.get(CONF_ACTIONS, {}),
153 CONF_HOUSEKEEPING: config.get(CONF_HOUSEKEEPING, {}),
154 CONF_ACTION_GROUPS: config.get(CONF_ACTION_GROUPS, {}),
155 CONF_SCENARIOS: list(config.get(CONF_SCENARIOS, {}).keys()),
156 CONF_TRANSPORTS: config.get(CONF_TRANSPORTS, {}),
157 CONF_CAMERAS: config.get(CONF_CAMERAS, {}),
158 CONF_DUPE_CHECK: config.get(CONF_DUPE_CHECK, {}),
159 CONF_SNOOZE: config.get(CONF_SNOOZE, {}),
160 }
162 def supplemental_action_refresh_entities(_call: ServiceCall) -> None:
163 return service.expose_entities()
165 def supplemental_action_enquire_implicit_deliveries(_call: ServiceCall) -> dict[str, Any]:
166 return service.enquire_implicit_deliveries()
168 def supplemental_action_enquire_deliveries_by_scenario(_call: ServiceCall) -> dict[str, Any]:
169 return service.enquire_deliveries_by_scenario()
171 def supplemental_action_enquire_last_notification(_call: ServiceCall) -> dict[str, Any]:
172 return service.last_notification.contents() if service.last_notification else {}
174 async def supplemental_action_enquire_active_scenarios(call: ServiceCall) -> dict[str, Any]:
175 trace = call.data.get("trace", False)
176 result: dict[str, Any] = {"scenarios": await service.enquire_active_scenarios()}
177 if trace:
178 result["trace"] = await service.trace_active_scenarios()
179 return result
181 def supplemental_action_enquire_scenarios(_call: ServiceCall) -> dict[str, Any]:
182 return {"scenarios": service.enquire_scenarios()}
184 async def supplemental_action_enquire_occupancy(_call: ServiceCall) -> dict[str, Any]:
185 return {"scenarios": await service.enquire_occupancy()}
187 def supplemental_action_enquire_snoozes(_call: ServiceCall) -> dict[str, Any]:
188 return {"snoozes": service.enquire_snoozes()}
190 def supplemental_action_clear_snoozes(_call: ServiceCall) -> dict[str, Any]:
191 return {"cleared": service.clear_snoozes()}
193 def supplemental_action_enquire_recipients(_call: ServiceCall) -> dict[str, Any]:
194 return {"recipients": service.enquire_recipients()}
196 async def supplemental_action_purge_archive(call: ServiceCall) -> dict[str, Any]:
197 days = call.data.get("days")
198 if not service.context.archive.enabled:
199 return {"error": "No archive configured"}
200 purged = await service.context.archive.cleanup(days=days, force=True)
201 arch_size = await service.context.archive.size()
202 return {
203 "purged": purged,
204 "remaining": arch_size,
205 "interval": ARCHIVE_PURGE_MIN_INTERVAL,
206 "days": service.context.archive.archive_days if days is None else days,
207 }
209 async def supplemental_action_purge_media(call: ServiceCall) -> dict[str, Any]:
210 days = call.data.get("days")
211 if not service.context.media_storage.media_path:
212 return {"error": "No media storage configured"}
213 purged = await service.context.media_storage.cleanup(days=days, force=True)
214 size = await service.context.media_storage.size()
215 return {
216 "purged": purged,
217 "remaining": size,
218 "interval": service.context.media_storage.purge_minute_interval,
219 "days": service.context.media_storage.days if days is None else days,
220 }
222 hass.services.async_register(
223 DOMAIN,
224 "enquire_configuration",
225 supplemental_action_enquire_configuration,
226 supports_response=SupportsResponse.ONLY,
227 )
228 hass.services.async_register(
229 DOMAIN,
230 "enquire_implicit_deliveries",
231 supplemental_action_enquire_implicit_deliveries,
232 supports_response=SupportsResponse.ONLY,
233 )
234 hass.services.async_register(
235 DOMAIN,
236 "enquire_deliveries_by_scenario",
237 supplemental_action_enquire_deliveries_by_scenario,
238 supports_response=SupportsResponse.ONLY,
239 )
240 hass.services.async_register(
241 DOMAIN,
242 "enquire_last_notification",
243 supplemental_action_enquire_last_notification,
244 supports_response=SupportsResponse.ONLY,
245 )
246 hass.services.async_register(
247 DOMAIN,
248 "enquire_active_scenarios",
249 supplemental_action_enquire_active_scenarios,
250 supports_response=SupportsResponse.ONLY,
251 )
252 hass.services.async_register(
253 DOMAIN,
254 "enquire_scenarios",
255 supplemental_action_enquire_scenarios,
256 supports_response=SupportsResponse.ONLY,
257 )
258 hass.services.async_register(
259 DOMAIN,
260 "enquire_occupancy",
261 supplemental_action_enquire_occupancy,
262 supports_response=SupportsResponse.ONLY,
263 )
264 hass.services.async_register(
265 DOMAIN,
266 "enquire_recipients",
267 supplemental_action_enquire_recipients,
268 supports_response=SupportsResponse.ONLY,
269 )
270 hass.services.async_register(
271 DOMAIN,
272 "enquire_snoozes",
273 supplemental_action_enquire_snoozes,
274 supports_response=SupportsResponse.ONLY,
275 )
276 hass.services.async_register(
277 DOMAIN,
278 "clear_snoozes",
279 supplemental_action_clear_snoozes,
280 supports_response=SupportsResponse.ONLY,
281 )
282 hass.services.async_register(
283 DOMAIN,
284 "purge_archive",
285 supplemental_action_purge_archive,
286 supports_response=SupportsResponse.ONLY,
287 )
288 hass.services.async_register(
289 DOMAIN,
290 "purge_media",
291 supplemental_action_purge_media,
292 supports_response=SupportsResponse.ONLY,
293 )
294 hass.services.async_register(
295 DOMAIN,
296 "refresh_entities",
297 supplemental_action_refresh_entities,
298 supports_response=SupportsResponse.NONE,
299 )
301 return service
304class SupernotifyEntity(NotifyEntity):
305 """Implement supernotify as a NotifyEntity platform."""
307 _attr_has_entity_name = True
308 _attr_name = "supernotify"
310 def __init__(
311 self,
312 unique_id: str,
313 platform: SupernotifyAction,
314 ) -> None:
315 """Initialize the SuperNotify entity."""
316 self._attr_unique_id = unique_id
317 self._attr_supported_features = NotifyEntityFeature.TITLE
318 self._platform = platform
320 async def async_send_message(
321 self, message: str, title: str | None = None, target: str | list[str] | None = None, data: dict[str, Any] | None = None
322 ) -> None:
323 """Send a message to a user."""
324 await self._platform.async_send_message(message, title=title, target=target, data=data)
327class SupernotifyAction(BaseNotificationService):
328 """Implement SuperNotify Action"""
330 def __init__(
331 self,
332 hass: HomeAssistant,
333 deliveries: dict[str, dict[str, Any]] | None = None,
334 template_path: str | None = None,
335 media_path: str | None = None,
336 archive: dict[str, Any] | None = None,
337 housekeeping: dict[str, Any] | None = None,
338 recipients_discovery: bool = True,
339 mobile_discovery: bool = True,
340 recipients: list[dict[str, Any]] | None = None,
341 mobile_actions: dict[str, Any] | None = None,
342 scenarios: dict[str, dict[str, Any]] | None = None,
343 links: list[str] | None = None,
344 transport_configs: dict[str, Any] | None = None,
345 cameras: list[dict[str, Any]] | None = None,
346 dupe_check: dict[str, Any] | None = None,
347 snooze: dict[str, Any] | None = None,
348 ) -> None:
349 """Initialize the service."""
350 self.last_notification: Notification | None = None
351 self.failures: int = 0
352 self.housekeeping: dict[str, Any] = housekeeping or {}
353 self.sent: int = 0
354 hass_api = HomeAssistantAPI(hass)
355 self.context = Context(
356 hass_api,
357 PeopleRegistry(recipients or [], hass_api, discover=recipients_discovery, mobile_discovery=mobile_discovery),
358 ScenarioRegistry(scenarios or {}),
359 DeliveryRegistry(deliveries or {}, transport_configs or {}, TRANSPORTS),
360 DupeChecker(dupe_check or {}),
361 NotificationArchive(archive or {}, hass_api),
362 MediaStorage(media_path, self.housekeeping.get(CONF_MEDIA_STORAGE_DAYS, 7)),
363 Snoozer(snooze),
364 links or [],
365 recipients or [],
366 mobile_actions,
367 template_path,
368 cameras=cameras,
369 )
371 self.exposed_entities: list[str] = []
373 async def initialize(self) -> None:
374 await self.context.initialize()
375 self.context.hass_api.initialize()
376 self.context.people_registry.initialize()
377 await self.context.delivery_registry.initialize(self.context)
378 await self.context.scenario_registry.initialize(
379 self.context.delivery_registry,
380 self.context.mobile_actions,
381 self.context.hass_api,
382 )
383 await self.context.archive.initialize()
384 await self.context.media_storage.initialize(self.context.hass_api)
386 self.expose_entities()
387 self.context.hass_api.subscribe_event("mobile_app_notification_action", self.on_mobile_action)
388 self.context.hass_api.subscribe_state(self.exposed_entities, self._entity_state_change_listener)
390 housekeeping_schedule = self.housekeeping.get(CONF_HOUSEKEEPING_TIME)
391 if housekeeping_schedule:
392 _LOGGER.info("SUPERNOTIFY setting up housekeeping schedule at: %s", housekeeping_schedule)
393 self.context.hass_api.subscribe_time(
394 housekeeping_schedule.hour, housekeeping_schedule.minute, housekeeping_schedule.second, self.async_nightly_tasks
395 )
397 self.context.hass_api.subscribe_event(EVENT_HOMEASSISTANT_STOP, self.async_shutdown)
399 async def async_shutdown(self, event: Event) -> None:
400 _LOGGER.info("SUPERNOTIFY shutting down, %s (%s)", event.event_type, event.time_fired)
401 self.shutdown()
403 async def async_unregister_services(self) -> None:
404 _LOGGER.info("SUPERNOTIFY unregistering")
405 self.shutdown()
406 return await super().async_unregister_services()
408 def shutdown(self) -> None:
409 self.context.hass_api.disconnect()
410 _LOGGER.info("SUPERNOTIFY shut down")
412 async def async_send_message(
413 self, message: str = "", title: str | None = None, target: list[str] | str | None = None, **kwargs: Any
414 ) -> None:
415 """Send a message via chosen transport."""
416 data = kwargs.get(ATTR_DATA, {})
417 notification = None
418 _LOGGER.debug("Message: %s, target: %s, data: %s", message, target, data)
420 try:
421 notification = Notification(self.context, message, title, target, data)
422 await notification.initialize()
423 if await notification.deliver():
424 self.sent += 1
425 self.context.hass_api.set_state(f"sensor.{DOMAIN}_notifications", self.sent)
426 elif notification.failed:
427 _LOGGER.error("SUPERNOTIFY Failed to deliver %s, error count %s", notification.id, notification.error_count)
428 else:
429 if notification.delivered == 0:
430 codes: list[SuppressionReason] = notification._skip_reasons
431 reason: str = ",".join(str(code) for code in codes)
432 problem: bool = codes != [SuppressionReason.DUPE]
433 else:
434 problem = True
435 reason = "No delivery envelopes generated"
436 if problem:
437 _LOGGER.warning("SUPERNOTIFY No deliveries made for %s: %s", notification.id, reason)
438 else:
439 _LOGGER.debug("SUPERNOTIFY Deliveries suppressed for %s: %s", notification.id, reason)
441 except Exception as err:
442 # fault barrier of last resort, integration failures should be caught within envelope delivery
443 _LOGGER.exception("SUPERNOTIFY Failed to send message %s", message)
444 self.failures += 1
445 if notification is not None:
446 notification._delivery_error = format_exception(err)
447 self.context.hass_api.set_state(f"sensor.{DOMAIN}_failures", self.failures)
449 if notification is None:
450 _LOGGER.warning("SUPERNOTIFY NULL Notification, %s", message)
451 else:
452 self.last_notification = notification
453 await self.context.archive.archive(notification)
454 _LOGGER.debug(
455 "SUPERNOTIFY %s deliveries, %s failed, %s skipped, %s suppressed",
456 notification.delivered,
457 notification.failed,
458 notification.skipped,
459 notification.suppressed,
460 )
462 async def _entity_state_change_listener(self, event: Event[EventStateChangedData]) -> None:
463 changes = 0
464 if event is not None:
465 _LOGGER.debug(f"SUPERNOTIFY {event.event_type} event for entity: {event.data}")
466 new_state: State | None = event.data["new_state"]
467 if new_state and event.data["entity_id"].startswith(f"binary_sensor.{DOMAIN}_scenario_"):
468 scenario: Scenario | None = self.context.scenario_registry.scenarios.get(
469 event.data["entity_id"].replace(f"binary_sensor.{DOMAIN}_scenario_", "")
470 )
471 if scenario is None:
472 _LOGGER.warning(f"SUPERNOTIFY Event for unknown scenario {event.data['entity_id']}")
473 else:
474 if new_state.state == "off" and scenario.enabled:
475 scenario.enabled = False
476 _LOGGER.info(f"SUPERNOTIFY Disabling scenario {scenario.name}")
477 changes += 1
478 elif new_state.state == "on" and not scenario.enabled:
479 scenario.enabled = True
480 _LOGGER.info(f"SUPERNOTIFY Enabling scenario {scenario.name}")
481 changes += 1
482 else:
483 _LOGGER.info(f"SUPERNOTIFY No change to scenario {scenario.name}, already {new_state}")
484 elif new_state and event.data["entity_id"].startswith(f"binary_sensor.{DOMAIN}_delivery_"):
485 delivery_name: str = event.data["entity_id"].replace(f"binary_sensor.{DOMAIN}_delivery_", "")
486 if new_state.state == "off":
487 if self.context.delivery_registry.disable(delivery_name):
488 changes += 1
489 elif new_state.state == "on":
490 if self.context.delivery_registry.enable(delivery_name):
491 changes += 1
492 else:
493 _LOGGER.info(f"SUPERNOTIFY No change to delivery {delivery_name} for state {new_state.state}")
494 elif new_state and event.data["entity_id"].startswith(f"binary_sensor.{DOMAIN}_transport_"):
495 transport: Transport | None = self.context.delivery_registry.transports.get(
496 event.data["entity_id"].replace(f"binary_sensor.{DOMAIN}_transport_", "")
497 )
498 if transport is None:
499 _LOGGER.warning(f"SUPERNOTIFY Event for unknown transport {event.data['entity_id']}")
500 else:
501 if new_state.state == "off" and transport.enabled:
502 transport.enabled = False
503 _LOGGER.info(f"SUPERNOTIFY Disabling transport {transport.name}")
504 changes += 1
505 elif new_state.state == "on" and not transport.enabled:
506 transport.enabled = True
507 _LOGGER.info(f"SUPERNOTIFY Enabling transport {transport.name}")
508 changes += 1
509 else:
510 _LOGGER.info(f"SUPERNOTIFY No change to transport {transport.name}, already {new_state}")
511 elif new_state and event.data["entity_id"].startswith(f"binary_sensor.{DOMAIN}_recipient_"):
512 recipient: Recipient | None = self.context.people_registry.people.get(
513 event.data["entity_id"].replace(f"binary_sensor.{DOMAIN}_recipient_", "person.")
514 )
515 if recipient is None:
516 _LOGGER.warning(f"SUPERNOTIFY Event for unknown recipient {event.data['entity_id']}")
517 else:
518 if new_state.state == "off" and recipient.enabled:
519 recipient.enabled = False
520 _LOGGER.info(f"SUPERNOTIFY Disabling recipient {recipient.entity_id}")
521 changes += 1
522 elif new_state.state == "on" and not recipient.enabled:
523 recipient.enabled = True
524 _LOGGER.info(f"SUPERNOTIFY Enabling recipient {recipient.entity_id}")
525 changes += 1
526 else:
527 _LOGGER.info(f"SUPERNOTIFY No change to recipient {recipient.entity_id}, already {new_state}")
529 else:
530 _LOGGER.warning("SUPERNOTIFY entity event with nothing to do:%s", event)
532 def expose_entity(
533 self,
534 entity_name: str,
535 state: str,
536 attributes: dict[str, Any],
537 platform: str = Platform.BINARY_SENSOR,
538 original_name: str | None = None,
539 original_icon: str | None = None,
540 entity_registry: er.EntityRegistry | None = None,
541 ) -> None:
542 """Expose a technical entity in Home Assistant representing internal state and attributes"""
543 entity_id: str
544 if entity_registry is not None:
545 try:
546 entry: er.RegistryEntry = entity_registry.async_get_or_create(
547 platform,
548 DOMAIN,
549 entity_name,
550 entity_category=EntityCategory.DIAGNOSTIC,
551 original_name=original_name,
552 original_icon=original_icon,
553 )
554 entity_id = entry.entity_id
555 except Exception as e:
556 _LOGGER.warning("SUPERNOTIFY Unable to register entity %s: %s", entity_name, e)
557 # continue anyway even if not registered as state is independent of entity
558 entity_id = f"{platform}.{DOMAIN}_{entity_name}"
559 try:
560 self.context.hass_api.set_state(entity_id, state, attributes)
561 self.exposed_entities.append(entity_id)
562 except Exception as e:
563 _LOGGER.error("SUPERNOTIFY Unable to set state for entity %s: %s", entity_id, e)
565 def expose_entities(self) -> None:
566 # Create on the fly entities for key internal config and state
567 ent_reg: er.EntityRegistry | None = self.context.hass_api.entity_registry()
568 if ent_reg is None:
569 _LOGGER.error("SUPERNOTIFY Unable to access entity registry to expose entities")
570 return
572 self.context.hass_api.set_state(f"sensor.{DOMAIN}_failures", self.failures)
573 self.context.hass_api.set_state(f"sensor.{DOMAIN}_notifications", self.sent)
575 for scenario in self.context.scenario_registry.scenarios.values():
576 self.expose_entity(
577 f"scenario_{scenario.name}",
578 state=STATE_UNKNOWN,
579 attributes=sanitize(scenario.attributes(include_condition=False)),
580 original_name=f"{scenario.name} Scenario",
581 original_icon="mdi:clipboard-text",
582 entity_registry=ent_reg,
583 )
584 for transport in self.context.delivery_registry.transports.values():
585 self.expose_entity(
586 f"transport_{transport.name}",
587 state=STATE_ON if transport.enabled else STATE_OFF,
588 attributes=sanitize(transport.attributes()),
589 original_name=f"{transport.name} Transport Adaptor",
590 original_icon="mdi:truck-fast",
591 entity_registry=ent_reg,
592 )
594 for delivery in self.context.delivery_registry.deliveries.values():
595 self.expose_entity(
596 f"delivery_{delivery.name}",
597 state=STATE_ON if delivery.enabled else STATE_OFF,
598 attributes=sanitize(delivery.attributes()),
599 original_name=f"{delivery.name} Delivery Configuration",
600 original_icon="mdi:package-variant",
601 entity_registry=ent_reg,
602 )
604 for recipient in self.context.people_registry.people.values():
605 self.expose_entity(
606 f"recipient_{recipient.name}",
607 state=STATE_ON if recipient.enabled else STATE_OFF,
608 attributes=sanitize(recipient.attributes()),
609 original_name=f"{recipient.name}",
610 original_icon="mdi:account-arrow-left",
611 entity_registry=ent_reg,
612 )
614 def enquire_implicit_deliveries(self) -> dict[str, Any]:
615 v: dict[str, list[str]] = {}
616 for t in self.context.delivery_registry.transports:
617 for d in self.context.delivery_registry.implicit_deliveries:
618 if d.transport.name == t:
619 v.setdefault(t, [])
620 v[t].append(d.name)
621 return v
623 def enquire_deliveries_by_scenario(self) -> dict[str, dict[str, list[str]]]:
624 return {
625 name: {
626 "enabled": scenario.enabling_deliveries(),
627 "disabled": scenario.disabling_deliveries(),
628 "applies": scenario.relevant_deliveries(),
629 }
630 for name, scenario in self.context.scenario_registry.scenarios.items()
631 if scenario.enabled
632 }
634 async def enquire_occupancy(self) -> dict[str, list[dict[str, Any]]]:
635 occupancy = self.context.people_registry.determine_occupancy()
636 return {k: [v.as_dict() for v in vs] for k, vs in occupancy.items()}
638 async def enquire_active_scenarios(self) -> list[str]:
639 occupiers: dict[str, list[Recipient]] = self.context.people_registry.determine_occupancy()
640 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
641 return [s.name for s in self.context.scenario_registry.scenarios.values() if s.evaluate(cvars)]
643 async def trace_active_scenarios(self) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
644 occupiers: dict[str, list[Recipient]] = self.context.people_registry.determine_occupancy()
645 cvars = ConditionVariables([], [], [], PRIORITY_MEDIUM, occupiers, None, None)
647 def safe_json(v: Any) -> Any:
648 return json.loads(json.dumps(v, cls=ExtendedJSONEncoder))
650 enabled = []
651 disabled = []
652 dcvars = asdict(cvars)
653 for s in self.context.scenario_registry.scenarios.values():
654 if await s.trace(cvars):
655 enabled.append(safe_json(s.attributes(include_trace=True)))
656 else:
657 disabled.append(safe_json(s.attributes(include_trace=True)))
658 return enabled, disabled, dcvars
660 def enquire_scenarios(self) -> dict[str, dict[str, Any]]:
661 return {s.name: s.attributes(include_condition=False) for s in self.context.scenario_registry.scenarios.values()}
663 def enquire_snoozes(self) -> list[dict[str, Any]]:
664 return self.context.snoozer.export()
666 def clear_snoozes(self) -> int:
667 return self.context.snoozer.clear()
669 def enquire_recipients(self) -> list[dict[str, Any]]:
670 return [p.as_dict() for p in self.context.people_registry.people.values()]
672 @callback
673 def on_mobile_action(self, event: Event) -> None:
674 """Listen for mobile actions relevant to snooze and silence notifications
676 Example Action:
677 event_type: mobile_app_notification_action
678 data:
679 foo: a
680 origin: REMOTE
681 time_fired: "2024-04-20T13:14:09.360708+00:00"
682 context:
683 id: 01HVXT93JGWEDW0KE57Z0X6Z1K
684 parent_id: null
685 user_id: a9dbae1a5abf33dbbad52ff82201bb17
686 """
687 event_name = event.data.get(ATTR_ACTION)
688 if event_name is None or not event_name.startswith("SUPERNOTIFY_"):
689 return # event not intended for here
690 self.context.snoozer.handle_command_event(event, self.context.people_registry.enabled_recipients())
692 @callback
693 async def async_nightly_tasks(self, now: dt.datetime) -> None:
694 _LOGGER.info("SUPERNOTIFY Housekeeping starting as scheduled at %s", now)
695 await self.context.archive.cleanup()
696 self.context.snoozer.purge_snoozes()
697 await self.context.media_storage.cleanup()
698 _LOGGER.info("SUPERNOTIFY Housekeeping completed")