Coverage for custom_components/supernotify/hass_api.py: 88%
425 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-03-28 19:39 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-03-28 19:39 +0000
1from __future__ import annotations
3import logging
4from dataclasses import dataclass
5from functools import partial
6from typing import TYPE_CHECKING, Any
8from homeassistant.components.person import ATTR_USER_ID
9from homeassistant.const import CONF_ACTION, CONF_DEVICE_ID
10from homeassistant.helpers.aiohttp_client import async_get_clientsession
11from homeassistant.helpers.event import async_track_state_change_event, async_track_time_change
12from homeassistant.util import slugify
14if TYPE_CHECKING:
15 import asyncio
16 from collections.abc import Callable, Iterable, Iterator
18 import aiohttp
19 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, Service, ServiceResponse, State
20 from homeassistant.helpers.entity import Entity
21 from homeassistant.helpers.entity_registry import EntityRegistry
22 from homeassistant.helpers.typing import ConfigType
23 from homeassistant.util.event_type import EventType
25 from .schema import ConditionsFunc
27import socket
28import threading
29from contextlib import contextmanager
30from typing import TYPE_CHECKING, cast
32import homeassistant.components.trace
33from homeassistant.components import mqtt
34from homeassistant.components.group import expand_entity_ids
35from homeassistant.components.trace.const import DATA_TRACE
36from homeassistant.components.trace.models import ActionTrace
37from homeassistant.components.trace.util import async_store_trace
38from homeassistant.core import Context as HomeAssistantContext
39from homeassistant.core import HomeAssistant, SupportsResponse
40from homeassistant.exceptions import ConditionError, ConditionErrorContainer, IntegrationError
41from homeassistant.helpers import condition as condition
42from homeassistant.helpers import device_registry as dr
43from homeassistant.helpers import entity_registry as er
44from homeassistant.helpers import issue_registry as ir
45from homeassistant.helpers.json import json_dumps
46from homeassistant.helpers.network import get_url
47from homeassistant.helpers.template import Template
48from homeassistant.helpers.trace import trace_get, trace_path
49from homeassistant.helpers.typing import ConfigType
51from . import DOMAIN
52from .const import CONF_DEVICE_LABELS, CONF_DEVICE_TRACKER, CONF_MOBILE_APP_ID
53from .model import ConditionVariables, SelectionRule
55if TYPE_CHECKING:
56 from homeassistant.core import HomeAssistant
57 from homeassistant.helpers.device_registry import DeviceEntry, DeviceRegistry
58 from homeassistant.helpers.typing import ConfigType
60# avoid importing from homeassistant.components.mobile_app.const and triggering dependency chain
62CONF_USER_ID = "user_id"
63ATTR_OS_NAME = "os_name"
64ATTR_OS_VERSION = "os_version"
65ATTR_APP_VERSION = "app_version"
66ATTR_DEVICE_NAME = "device_name"
67ATTR_MANUFACTURER = "manufacturer"
68ATTR_MODEL = "model"
70_LOGGER = logging.getLogger(__name__)
73@dataclass
74class DeviceInfo:
75 device_id: str
76 device_labels: list[str] | None = None
77 mobile_app_id: str | None = None
78 device_name: str | None = None
79 device_tracker: str | None = None
80 action: str | None = None
81 user_id: str | None = None
82 area_id: str | None = None
83 manufacturer: str | None = None
84 model: str | None = None
85 os_name: str | None = None
86 os_version: str | None = None
87 app_version: str | None = None
88 identifiers: set[tuple[str, str]] | None = None
90 def as_dict(self) -> dict[str, str | list[str] | None]:
91 return {
92 CONF_MOBILE_APP_ID: self.mobile_app_id,
93 ATTR_DEVICE_NAME: self.device_name,
94 CONF_DEVICE_ID: self.device_id,
95 CONF_USER_ID: self.user_id,
96 CONF_DEVICE_TRACKER: self.device_tracker,
97 CONF_ACTION: self.action,
98 ATTR_OS_NAME: self.os_name,
99 ATTR_OS_VERSION: self.os_version,
100 ATTR_APP_VERSION: self.app_version,
101 ATTR_MANUFACTURER: self.manufacturer,
102 ATTR_MODEL: self.model,
103 CONF_DEVICE_LABELS: self.device_labels,
104 }
106 def __eq__(self, other: Any) -> bool:
107 """Test support"""
108 return other is not None and other.as_dict() == self.as_dict()
111class HomeAssistantAPI:
112 def __init__(self, hass: HomeAssistant) -> None:
113 self._hass: HomeAssistant = hass
114 self.internal_url: str = ""
115 self.external_url: str = ""
116 self.language: str = ""
117 self.hass_name: str = "!UNDEFINED!"
118 self._entity_registry: er.EntityRegistry | None = None
119 self._device_registry: dr.DeviceRegistry | None = None
120 self._service_info: dict[tuple[str, str], Any] = {}
121 self.unsubscribes: list[CALLBACK_TYPE] = []
122 self.mobile_apps_by_tracker: dict[str, DeviceInfo] = {}
123 self.mobile_apps_by_app_id: dict[str, DeviceInfo] = {}
124 self.mobile_apps_by_device_id: dict[str, DeviceInfo] = {}
125 self.mobile_apps_by_user_id: dict[str, list[DeviceInfo]] = {}
127 def initialize(self) -> None:
128 self.hass_name = self._hass.config.location_name
129 self.language = self._hass.config.language
130 try:
131 self.internal_url = get_url(self._hass, prefer_external=False)
132 except Exception as e:
133 self.internal_url = f"http://{socket.gethostname()}"
134 _LOGGER.warning("SUPERNOTIFY could not get internal hass url, defaulting to %s: %s", self.internal_url, e)
135 try:
136 self.external_url = get_url(self._hass, prefer_external=True)
137 except Exception as e:
138 _LOGGER.warning("SUPERNOTIFY could not get external hass url, defaulting to internal url: %s", e)
139 self.external_url = self.internal_url
141 self.build_mobile_app_cache()
143 _LOGGER.debug(
144 "SUPERNOTIFY Configured for HomeAssistant instance %s at %s , %s",
145 self.hass_name,
146 self.internal_url,
147 self.external_url,
148 )
150 if not self.internal_url or not self.internal_url.startswith("http"):
151 _LOGGER.warning("SUPERNOTIFY invalid internal hass url %s", self.internal_url)
153 def disconnect(self) -> None:
154 while self.unsubscribes:
155 unsub = self.unsubscribes.pop()
156 try:
157 _LOGGER.debug("SUPERNOTIFY unsubscribing: %s", unsub)
158 unsub()
159 except Exception as e:
160 _LOGGER.error("SUPERNOTIFY failed to unsubscribe: %s", e)
161 _LOGGER.debug("SUPERNOTIFY disconnection complete")
163 def subscribe_event(self, event: EventType | str, callback: Callable) -> None:
164 self.unsubscribes.append(self._hass.bus.async_listen(event, callback))
166 def subscribe_state(self, entity_ids: str | Iterable[str], callback: Callable) -> None:
167 self.unsubscribes.append(async_track_state_change_event(self._hass, entity_ids, callback))
169 def subscribe_time(self, hour: int, minute: int, second: int, callback: Callable) -> None:
170 self.unsubscribes.append(async_track_time_change(self._hass, callback, hour=hour, minute=minute, second=second))
172 def in_hass_loop(self) -> bool:
173 return self._hass is not None and self._hass.loop_thread_id == threading.get_ident()
175 def get_state(self, entity_id: str) -> State | None:
176 return self._hass.states.get(entity_id)
178 def is_state(self, entity_id: str, state: str) -> bool:
179 return self._hass.states.is_state(entity_id, state)
181 def set_state(self, entity_id: str, state: str | int | bool, attributes: dict[str, Any] | None = None) -> None:
182 if self.in_hass_loop():
183 self._hass.states.async_set(entity_id, str(state), attributes=attributes)
184 else:
185 self._hass.states.set(entity_id, str(state), attributes=attributes)
187 def has_service(self, domain: str, service: str) -> bool:
188 return self._hass.services.has_service(domain, service)
190 def entity_ids_for_domain(self, domain: str) -> list[str]:
191 return self._hass.states.async_entity_ids(domain)
193 def domain_entity(self, domain: str, entity_id: str) -> Entity | None:
194 # TODO: must be a better hass method than this
195 return self._hass.data.get(domain, {}).get_entity(entity_id)
197 def create_job(self, func: Callable, *args: Any) -> asyncio.Future[Any]:
198 """Wrap a blocking function call in a HomeAssistant awaitable job"""
199 return self._hass.async_add_executor_job(func, *args)
201 def fire_event(self, event_name: str, event_data: dict[str, Any] | None = None) -> None:
202 self._hass.bus.async_fire(event_name, event_data)
204 async def call_service(
205 self,
206 domain: str,
207 service: str,
208 service_data: dict[str, Any] | None = None,
209 target: dict[str, Any] | None = None,
210 return_response: bool | None = None,
211 blocking: bool | None = None,
212 debug: bool = False,
213 ) -> ServiceResponse | None:
215 if return_response is None or blocking is None:
216 # unknown service, for example defined in generic action, check if it supports response
217 supports_response: SupportsResponse = self.service_info(domain, service)
218 if supports_response == SupportsResponse.NONE:
219 return_response = False
220 elif supports_response == SupportsResponse.ONLY:
221 return_response = True
222 else:
223 return_response = debug
224 blocking = return_response or debug
226 response: ServiceResponse | None = await self._hass.services.async_call(
227 domain,
228 service,
229 service_data=service_data,
230 blocking=blocking,
231 context=None,
232 target=target,
233 return_response=return_response,
234 )
235 if response is not None and debug:
236 _LOGGER.info("SUPERNOTIFY Service %s.%s response: %s", domain, service, response)
237 return response
239 def service_info(self, domain: str, service: str) -> SupportsResponse:
241 try:
242 if (domain, service) not in self._service_info:
243 service_objs: dict[str, dict[str, Service]] = self._hass.services.async_services()
244 service_obj: Service | None = service_objs.get(domain, {}).get(service)
245 if service_obj:
246 self._service_info[domain, service] = {
247 "supports_response": service_obj.supports_response,
248 "schema": service_obj.schema,
249 }
250 service_info: dict[str, Any] = self._service_info.get((domain, service), {})
251 supports_response: SupportsResponse | None = service_info.get("supports_response")
252 if supports_response is None:
253 _LOGGER.debug("SUPERNOTIFY Unable to find service info for %s.%s", domain, service)
255 except Exception as e:
256 _LOGGER.warning("SUPERNOTIFY Unable to get service info for %s.%s: %s", domain, service, e)
257 return supports_response or SupportsResponse.NONE # default to no response
259 def find_service(self, domain: str, module: str) -> str | None:
260 try:
261 service_objs: dict[str, dict[str, Service]] = self._hass.services.async_services()
262 if service_objs:
263 for service, domain_obj in service_objs.get(domain, {}).items():
264 if domain_obj.job and domain_obj.job.target:
265 target_module: str | None = (
266 domain_obj.job.target.__self__.__module__
267 if hasattr(domain_obj.job.target, "__self__")
268 else domain_obj.job.target.__module__
269 )
270 if target_module == module:
271 _LOGGER.debug("SUPERNOTIFY Found service %s for domain %s", domain, service)
272 return f"{domain}.{service}"
274 _LOGGER.debug("SUPERNOTIFY Unable to find service for %s", domain)
275 except Exception as e:
276 _LOGGER.warning("SUPERNOTIFY Unable to find service for %s: %s", domain, e)
277 return None
279 def http_session(self) -> aiohttp.ClientSession:
280 """Client aiohttp session for async web requests"""
281 return async_get_clientsession(self._hass)
283 def expand_group(self, entity_ids: str | list[str]) -> list[str]:
284 return expand_entity_ids(self._hass, entity_ids)
286 def template(self, template_format: str) -> Template:
287 return Template(template_format, self._hass)
289 async def trace_conditions(
290 self,
291 conditions: ConditionsFunc,
292 condition_variables: ConditionVariables,
293 trace_name: str | None = None,
294 ) -> tuple[bool | None, ActionTrace | None]:
296 result: bool | None = None
297 this_trace: ActionTrace | None = None
298 if DATA_TRACE not in self._hass.data:
299 _LOGGER.warning("SUPERNOTIFY tracing not configured, attempting to set up")
301 await homeassistant.components.trace.async_setup(self._hass, {}) # type: ignore
302 with trace_action(self._hass, trace_name or "anon_condition") as cond_trace:
303 cond_trace.set_trace(trace_get())
304 this_trace = cond_trace
305 with trace_path(["condition", "conditions"]) as _tp:
306 result = self.evaluate_conditions(conditions, condition_variables)
307 _LOGGER.debug(cond_trace.as_dict())
308 return result, this_trace
310 async def build_conditions(
311 self, condition_config: list[ConfigType], strict: bool = False, validate: bool = False, name: str = DOMAIN
312 ) -> ConditionsFunc | None:
313 capturing_logger: ConditionErrorLoggingAdaptor = ConditionErrorLoggingAdaptor(_LOGGER)
314 condition_variables: ConditionVariables = ConditionVariables()
315 cond_list: list[ConfigType]
316 try:
317 if validate:
318 cond_list = cast(
319 "list[ConfigType]", await condition.async_validate_conditions_config(self._hass, condition_config)
320 )
321 else:
322 cond_list = condition_config
323 except Exception as e:
324 _LOGGER.exception("SUPERNOTIFY Conditions validation failed: %s", e)
325 raise
326 try:
327 if strict:
328 force_strict_template_mode(cond_list, undo=False)
330 test: ConditionsFunc = await condition.async_conditions_from_config(
331 self._hass, cond_list, cast("logging.Logger", capturing_logger), name
332 )
333 if test is None:
334 raise IntegrationError(f"Invalid condition {condition_config}")
335 test(condition_variables.as_dict())
336 return test
337 except Exception as e:
338 _LOGGER.exception("SUPERNOTIFY Conditions eval failed: %s", e)
339 raise
340 finally:
341 if strict:
342 force_strict_template_mode(condition_config, undo=True)
343 if strict and capturing_logger.condition_errors and len(capturing_logger.condition_errors) > 0:
344 for exception in capturing_logger.condition_errors:
345 _LOGGER.warning("SUPERNOTIFY Invalid condition %s:%s", condition_config, exception)
346 raise capturing_logger.condition_errors[0]
348 def evaluate_conditions(
349 self,
350 conditions: ConditionsFunc,
351 condition_variables: ConditionVariables,
352 ) -> bool | None:
353 try:
354 if not condition_variables:
355 _LOGGER.warning("SUPERNOTIFY No cond vars provided for condition")
356 return conditions(condition_variables.as_dict() if condition_variables is not None else None)
357 except Exception as e:
358 _LOGGER.error("SUPERNOTIFY Condition eval failed: %s", e)
359 raise
361 def abs_url(self, fragment: str | None, prefer_external: bool = True) -> str | None:
362 base_url = self.external_url if prefer_external else self.internal_url
363 if fragment:
364 if fragment.startswith("http"):
365 return fragment
366 if fragment.startswith("/"):
367 return base_url + fragment
368 return base_url + "/" + fragment
369 return None
371 def raise_issue(
372 self,
373 issue_id: str,
374 issue_key: str,
375 issue_map: dict[str, str],
376 severity: ir.IssueSeverity = ir.IssueSeverity.WARNING,
377 learn_more_url: str = "https://supernotify.rhizomatics.org.uk",
378 is_fixable: bool = False,
379 ) -> None:
380 ir.async_create_issue(
381 self._hass,
382 DOMAIN,
383 issue_id,
384 translation_key=issue_key,
385 translation_placeholders=issue_map,
386 severity=severity,
387 learn_more_url=learn_more_url,
388 is_fixable=is_fixable,
389 )
391 def mobile_app_by_tracker(self, device_tracker: str) -> DeviceInfo | None:
392 return self.mobile_apps_by_tracker.get(device_tracker)
394 def mobile_app_by_id(self, mobile_app_id: str) -> DeviceInfo | None:
395 return self.mobile_apps_by_app_id.get(mobile_app_id)
397 def mobile_app_by_device_id(self, device_id: str) -> DeviceInfo | None:
398 return self.mobile_apps_by_device_id.get(device_id)
400 def mobile_app_by_user_id(self, user_id: str) -> list[DeviceInfo] | None:
401 return self.mobile_apps_by_user_id.get(user_id)
403 def build_mobile_app_cache(self) -> None:
404 """All enabled mobile apps"""
405 ent_reg: EntityRegistry | None = self.entity_registry()
406 if not ent_reg:
407 _LOGGER.warning("SUPERNOTIFY Unable to discover devices for - no entity registry found")
408 return
410 found: int = 0
411 complete: int = 0
412 for mobile_app_info in self.discover_devices("mobile_app"):
413 try:
414 mobile_app_id: str = f"mobile_app_{slugify(mobile_app_info.device_name)}"
415 device_tracker: str | None = None
416 notify_action: str | None = None
417 if self.has_service("notify", mobile_app_id):
418 notify_action = f"notify.{mobile_app_id}"
419 else:
420 _LOGGER.warning("SUPERNOTIFY Unable to find notify action <%s>", mobile_app_id)
422 registry_entries = ent_reg.entities.get_entries_for_device_id(mobile_app_info.device_id)
423 for reg_entry in registry_entries:
424 if reg_entry.platform == "mobile_app" and reg_entry.domain == "device_tracker":
425 device_tracker = reg_entry.entity_id
427 if device_tracker and notify_action:
428 complete += 1
430 mobile_app_info.mobile_app_id = mobile_app_id
431 mobile_app_info.device_tracker = device_tracker
432 mobile_app_info.action = notify_action
434 found += 1
435 self.mobile_apps_by_app_id[mobile_app_id] = mobile_app_info
436 self.mobile_apps_by_device_id[mobile_app_info.device_id] = mobile_app_info
437 if device_tracker:
438 self.mobile_apps_by_tracker[device_tracker] = mobile_app_info
439 if mobile_app_info.user_id is not None:
440 self.mobile_apps_by_user_id.setdefault(mobile_app_info.user_id, [])
441 self.mobile_apps_by_user_id[mobile_app_info.user_id].append(mobile_app_info)
443 except Exception as e:
444 _LOGGER.error("SUPERNOTIFY Failure examining device %s: %s", mobile_app_info, e)
446 _LOGGER.info(f"SUPERNOTIFY Found {found} enabled mobile app devices, {complete} complete config")
448 def device_config_info(self, device: DeviceEntry) -> dict[str, str | None]:
449 results: dict[str, str | None] = {ATTR_OS_NAME: None, ATTR_OS_VERSION: None, CONF_USER_ID: None, ATTR_APP_VERSION: None}
450 for config_entry_id in device.config_entries:
451 config_entry = self._hass.config_entries.async_get_entry(config_entry_id)
452 if config_entry and config_entry.data:
453 for attr in results:
454 results[attr] = config_entry.data.get(attr) or results[attr]
455 return results
457 def discover_devices(
458 self,
459 discover_domain: str,
460 device_model_select: SelectionRule | None = None,
461 device_manufacturer_select: SelectionRule | None = None,
462 device_os_select: SelectionRule | None = None,
463 device_area_select: SelectionRule | None = None,
464 device_label_select: SelectionRule | None = None,
465 ) -> list[DeviceInfo]:
466 devices: list[DeviceInfo] = []
467 dev_reg: DeviceRegistry | None = self.device_registry()
468 if dev_reg is None or not hasattr(dev_reg, "devices"):
469 _LOGGER.warning(f"SUPERNOTIFY Unable to discover devices for {discover_domain} - no device registry found")
470 return []
472 all_devs = enabled_devs = found_devs = skipped_devs = 0
473 for dev in dev_reg.devices.values():
474 all_devs += 1
476 if dev.disabled:
477 _LOGGER.debug("SUPERNOTIFY excluded disabled device %s", dev.name)
478 else:
479 enabled_devs += 1
480 for identifier in dev.identifiers:
481 if identifier and len(identifier) > 1 and identifier[0] == discover_domain:
482 _LOGGER.debug("SUPERNOTIFY discovered %s device %s for id %s", dev.model, dev.name, identifier)
483 found_devs += 1
484 if device_model_select is not None and not device_model_select.match(dev.model):
485 _LOGGER.debug("SUPERNOTIFY Skipped dev %s, no model %s match", dev.name, dev.model)
486 skipped_devs += 1
487 continue
488 if device_manufacturer_select is not None and not device_manufacturer_select.match(dev.manufacturer):
489 _LOGGER.debug("SUPERNOTIFY Skipped dev %s, no manufacturer %s match", dev.name, dev.manufacturer)
490 skipped_devs += 1
491 continue
492 device_config_info = self.device_config_info(dev)
493 if device_os_select is not None and not device_os_select.match(device_config_info[ATTR_OS_NAME]):
494 _LOGGER.debug(
495 "SUPERNOTIFY Skipped dev %s, no OS %s match", dev.name, device_config_info[ATTR_OS_NAME]
496 )
497 skipped_devs += 1
498 continue
499 if device_area_select is not None and not device_area_select.match(dev.area_id):
500 _LOGGER.debug("SUPERNOTIFY Skipped dev %s, no area %s match", dev.name, dev.area_id)
501 skipped_devs += 1
502 continue
503 if device_label_select is not None and not device_label_select.match(dev.labels):
504 _LOGGER.debug("SUPERNOTIFY Skipped dev %s, no label %s match", dev.name, dev.labels)
505 skipped_devs += 1
506 continue
507 devices.append(
508 DeviceInfo(
509 device_id=dev.id,
510 device_name=dev.name,
511 manufacturer=dev.manufacturer,
512 model=dev.model,
513 area_id=dev.area_id,
514 user_id=device_config_info[ATTR_USER_ID],
515 os_name=device_config_info[ATTR_OS_NAME],
516 os_version=device_config_info[ATTR_OS_VERSION],
517 app_version=device_config_info[ATTR_APP_VERSION],
518 device_labels=list(dev.labels) if dev.labels else [],
519 identifiers=dev.identifiers,
520 )
521 )
523 elif identifier:
524 # HomeKit has triples for identifiers, other domains may behave similarly
525 _LOGGER.debug("SUPERNOTIFY Ignoring device %s id: %s", dev.name, identifier)
526 else:
527 _LOGGER.debug( # type: ignore
528 "SUPERNOTIFY Unexpected %s device %s without id", dev.model, dev.name
529 )
531 _LOGGER.debug(f"SUPERNOTIFY {discover_domain} device discovery, all={all_devs},enabled={enabled_devs} ")
532 _LOGGER.debug(f"SUPERNOTIFY {discover_domain} skipped={skipped_devs}, found={found_devs}")
534 return devices
536 def domain_for_device(self, device_id: str, domains: list[str]) -> str | None:
537 # discover domain from device registry
538 verified_domain: str | None = None
539 device_registry = self.device_registry()
540 if device_registry:
541 device: DeviceEntry | None = device_registry.async_get(device_id)
542 if device:
543 matching_domains = [d for d, _id in device.identifiers if d in domains]
544 if matching_domains:
545 # TODO: limited to first domain found, unlikely to be more
546 return matching_domains[0]
547 _LOGGER.warning(
548 "SUPERNOTIFY A target that looks like a device_id can't be matched to supported integration: %s",
549 device_id,
550 )
551 return verified_domain
553 def entity_registry(self) -> er.EntityRegistry | None:
554 """Hass entity registry is weird, every component ends up creating its own, with a store, subscribing
555 to all entities, so do it once here
556 """ # noqa: D205
557 if self._entity_registry is not None:
558 return self._entity_registry
559 try:
560 self._entity_registry = er.async_get(self._hass)
561 except Exception as e:
562 _LOGGER.warning("SUPERNOTIFY Unable to get entity registry: %s", e)
563 return self._entity_registry
565 def device_registry(self) -> dr.DeviceRegistry | None:
566 """Hass device registry is weird, every component ends up creating its own, with a store, subscribing
567 to all devices, so do it once here
568 """ # noqa: D205
569 if self._device_registry is not None:
570 return self._device_registry
571 try:
572 self._device_registry = dr.async_get(self._hass)
573 except Exception as e:
574 _LOGGER.warning("SUPERNOTIFY Unable to get device registry: %s", e)
575 return self._device_registry
577 async def mqtt_available(self, raise_on_error: bool = True) -> bool:
578 try:
579 return await mqtt.async_wait_for_mqtt_client(self._hass) is True
580 except Exception:
581 _LOGGER.exception("SUPERNOTIFY MQTT integration failed on available check")
582 if raise_on_error:
583 raise
584 return False
586 async def mqtt_publish(
587 self, topic: str, payload: Any = None, qos: int = 0, retain: bool = False, raise_on_error: bool = True
588 ) -> None:
589 try:
590 await mqtt.async_publish(
591 self._hass,
592 topic=topic,
593 payload=json_dumps(payload),
594 qos=qos,
595 retain=retain,
596 )
597 except Exception:
598 _LOGGER.exception(f"SUPERNOTIFY MQTT publish failed to {topic}")
599 if raise_on_error:
600 raise
603class ConditionErrorLoggingAdaptor(logging.LoggerAdapter):
604 def __init__(self, *args: Any, **kwargs: Any) -> None:
605 super().__init__(*args, **kwargs)
606 self.condition_errors: list[ConditionError] = []
608 def capture(self, args: Any) -> None:
609 if args and isinstance(args, list | tuple):
610 for arg in args:
611 if isinstance(arg, ConditionErrorContainer):
612 self.condition_errors.extend(arg.errors)
613 elif isinstance(arg, ConditionError):
614 self.condition_errors.append(arg)
616 def error(self, msg: Any, *args: object, **kwargs: Any) -> None:
617 self.capture(args)
618 self.logger.error(msg, args, kwargs)
620 def warning(self, msg: Any, *args: Any, **kwargs: Any) -> None:
621 self.capture(args)
622 self.logger.warning(msg, args, kwargs)
625def force_strict_template_mode(conditions: list[ConfigType], undo: bool = False) -> None:
626 class TemplateWrapper:
627 def __init__(self, obj: Template) -> None:
628 self._obj = obj
630 def __getattr__(self, name: str) -> Any:
631 if name == "async_render_to_info":
632 return partial(self._obj.async_render_to_info, strict=True)
633 return getattr(self._obj, name)
635 def __setattr__(self, name: str, value: Any) -> None:
636 super().__setattr__(name, value)
638 def __repr__(self) -> str:
639 return self._obj.__repr__() if self._obj else "NULL TEMPLATE"
641 def wrap_template(cond: ConfigType, undo: bool) -> ConfigType:
642 for key, val in cond.items():
643 if not undo and isinstance(val, Template) and hasattr(val, "_env"):
644 cond[key] = TemplateWrapper(val)
645 elif undo and isinstance(val, TemplateWrapper):
646 cond[key] = val._obj
647 elif isinstance(val, dict):
648 wrap_template(val, undo)
649 return cond
651 if conditions is not None:
652 conditions = [wrap_template(condition, undo) for condition in conditions]
655@contextmanager
656def trace_action(
657 hass: HomeAssistant,
658 item_id: str,
659 config: dict[str, Any] | None = None,
660 context: HomeAssistantContext | None = None,
661 stored_traces: int = 5,
662) -> Iterator[ActionTrace]:
663 """Trace execution of a condition"""
664 trace = ActionTrace(item_id, config, None, context or HomeAssistantContext())
665 async_store_trace(hass, trace, stored_traces)
667 try:
668 yield trace
669 except Exception as ex:
670 if item_id:
671 trace.set_error(ex)
672 raise
673 finally:
674 if item_id:
675 trace.finished()