Coverage for custom_components/supernotify/transport.py: 91%
119 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
1# mypy: disable-error-code="name-defined"
3import logging
4import time
5from abc import abstractmethod
6from traceback import format_exception
7from typing import TYPE_CHECKING, Any
8from urllib.parse import urlparse
10from homeassistant.components.notify.const import ATTR_TARGET
11from homeassistant.const import (
12 ATTR_DEVICE_ID,
13 ATTR_ENTITY_ID,
14 ATTR_FRIENDLY_NAME,
15 ATTR_NAME,
16)
17from homeassistant.helpers.typing import ConfigType
18from homeassistant.util import dt as dt_util
20from custom_components.supernotify.common import CallRecord
21from custom_components.supernotify.context import Context
22from custom_components.supernotify.model import DeliveryConfig, Target, TargetRequired, TransportConfig
24from . import (
25 ATTR_ENABLED,
26 CONF_DELIVERY_DEFAULTS,
27 CONF_DEVICE_DISCOVERY,
28 CONF_DEVICE_DOMAIN,
29)
31if TYPE_CHECKING:
32 import datetime as dt
34 from .delivery import Delivery, DeliveryRegistry
35 from .hass_api import HomeAssistantAPI
36 from .people import PeopleRegistry
38_LOGGER = logging.getLogger(__name__)
41class Transport:
42 """Base class for delivery transports.
44 Sub classes integrste with Home Assistant notification services
45 or alternative notification mechanisms.
46 """
48 name: str
50 @abstractmethod
51 def __init__(self, context: Context, transport_config: ConfigType | None = None) -> None:
52 self.hass_api: HomeAssistantAPI = context.hass_api
53 self.people_registry: PeopleRegistry = context.people_registry
54 self.delivery_registry: DeliveryRegistry = context.delivery_registry
55 self.context: Context = context
57 self.transport_config = TransportConfig(transport_config or {}, class_config=self.default_config)
59 self.delivery_defaults: DeliveryConfig = self.transport_config.delivery_defaults
60 self.device_domain: list[str] = self.transport_config.device_domain or []
61 self.device_discovery: bool | None = self.transport_config.device_discovery
62 self.enabled = self.transport_config.enabled
63 self.override_enabled = self.enabled
64 self.alias = self.transport_config.alias
65 self.last_error_at: dt.datetime | None = None
66 self.last_error_in: str | None = None
67 self.last_error_message: str | None = None
68 self.error_count: int = 0
70 async def initialize(self) -> None:
71 """Async post-construction initialization"""
72 if self.name is None:
73 raise ValueError("No transport configured")
75 if self.device_discovery:
76 for domain in self.device_domain:
77 discovered: int = 0
78 added: int = 0
79 for d in self.hass_api.discover_devices(domain):
80 discovered += 1
81 if self.delivery_defaults.target is None:
82 self.delivery_defaults.target = Target()
83 if d.id not in self.delivery_defaults.target.device_ids:
84 _LOGGER.info(f"SUPERNOTIFY Discovered device {d.name} for {domain}, id {d.id}")
85 self.delivery_defaults.target.extend(ATTR_DEVICE_ID, d.id)
86 added += 1
88 _LOGGER.info(f"SUPERNOTIFY device discovery for {domain} found {discovered} devices, added {added} new ones")
90 @property
91 def targets(self) -> Target:
92 return self.delivery_defaults.target if self.delivery_defaults.target is not None else Target()
94 @property
95 def default_config(self) -> TransportConfig:
96 return TransportConfig()
98 @property
99 def auto_configure(self) -> bool:
100 return False
102 def validate_action(self, action: str | None) -> bool:
103 """Override in subclass if transport has fixed action or doesn't require one"""
104 return action == self.delivery_defaults.action
106 def attributes(self) -> dict[str, Any]:
107 attrs: dict[str, Any] = {
108 ATTR_NAME: self.name,
109 ATTR_ENABLED: self.override_enabled,
110 CONF_DEVICE_DOMAIN: self.device_domain,
111 CONF_DEVICE_DISCOVERY: self.device_discovery,
112 CONF_DELIVERY_DEFAULTS: self.delivery_defaults,
113 }
114 if self.alias:
115 attrs[ATTR_FRIENDLY_NAME] = self.alias
116 if self.last_error_at:
117 attrs["last_error_at"] = self.last_error_at.isoformat()
118 attrs["last_error_in"] = self.last_error_in
119 attrs["last_error_message"] = self.last_error_message
120 attrs["error_count"] = self.error_count
121 return attrs
123 @abstractmethod
124 async def deliver(self, envelope: "Envelope") -> bool: # noqa: F821 # type: ignore
125 """Delivery implementation
127 Args:
128 ----
129 envelope (Envelope): envelope to be delivered
131 """
133 def set_action_data(self, action_data: dict[str, Any], key: str, data: Any | None) -> Any:
134 if data is not None:
135 action_data[key] = data
136 return action_data
138 async def call_action(
139 self,
140 envelope: "Envelope", # noqa: F821 # type: ignore
141 qualified_action: str | None = None,
142 action_data: dict[str, Any] | None = None,
143 target_data: dict[str, Any] | None = None,
144 implied_target: bool = False, # True if the qualified action implies a target
145 ) -> bool:
146 action_data = action_data or {}
147 start_time = time.time()
148 domain = service = None
149 delivery: Delivery = envelope.delivery
150 try:
151 qualified_action = qualified_action or delivery.action
152 if qualified_action and (
153 action_data.get(ATTR_TARGET)
154 or action_data.get(ATTR_ENTITY_ID)
155 or implied_target
156 or delivery.target_required != TargetRequired.ALWAYS
157 or target_data
158 ):
159 domain, service = qualified_action.split(".", 1)
160 start_time = time.time()
161 if target_data:
162 # home-assistant messes with the service_data passed by ref
163 service_data_as_sent = dict(action_data)
164 service_response = await self.hass_api.call_service(
165 domain, service, service_data=action_data, target_data=target_data, debug=delivery.debug
166 )
167 envelope.calls.append(
168 CallRecord(
169 time.time() - start_time,
170 domain,
171 service,
172 debug=delivery.debug,
173 action_data=service_data_as_sent,
174 target_data=dict(target_data),
175 service_response=service_response,
176 )
177 )
178 else:
179 service_data_as_sent = dict(action_data)
180 service_response = await self.hass_api.call_service(
181 domain, service, service_data=action_data, debug=delivery.debug
182 )
183 envelope.calls.append(
184 CallRecord(
185 time.time() - start_time,
186 domain,
187 service,
188 debug=delivery.debug,
189 action_data=service_data_as_sent,
190 service_response=service_response,
191 )
192 )
193 envelope.delivered = 1
194 else:
195 _LOGGER.debug(
196 "SUPERNOTIFY skipping action call for service %s, targets %s",
197 qualified_action,
198 action_data.get(ATTR_TARGET),
199 )
200 envelope.skipped = 1
201 return True
202 except Exception as e:
203 self.record_error(str(e), method="call_action")
204 envelope.failed_calls.append(
205 CallRecord(time.time() - start_time, domain, service, action_data, target_data, exception=str(e))
206 )
207 _LOGGER.exception("SUPERNOTIFY Failed to notify %s via %s, data=%s", self.name, qualified_action, action_data)
208 envelope.errored += 1
209 envelope.delivery_error = format_exception(e)
210 return False
212 def record_error(self, message: str, method: str) -> None:
213 self.last_error_at = dt_util.utcnow()
214 self.last_error_message = message
215 self.last_error_in = method
216 self.error_count += 1
218 def simplify(self, text: str | None, strip_urls: bool = False) -> str | None:
219 """Simplify text for delivery transports with speaking or plain text interfaces"""
220 if not text:
221 return None
222 if strip_urls:
223 words = text.split()
224 text = " ".join(word for word in words if not urlparse(word).scheme)
225 text = text.translate(str.maketrans("_", " ", "()£$<>"))
226 _LOGGER.debug("SUPERNOTIFY Simplified text to: %s", text)
227 return text