Coverage for custom_components/supernotify/common.py: 96%
114 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1"""Miscellaneous helper functions.
3No same pkg dependencies permitted
4"""
6import datetime as dt
7import logging
8from abc import abstractmethod
9from collections.abc import KeysView
10from dataclasses import dataclass, field
11from enum import Enum
12from typing import TYPE_CHECKING, Any
14from cachetools import TTLCache
16from .const import (
17 ATTR_DUPE_POLICY_MT,
18 ATTR_DUPE_POLICY_MTSLP,
19 ATTR_DUPE_POLICY_NONE,
20 CONF_DUPE_POLICY,
21 CONF_SIZE,
22 CONF_TTL,
23 PRIORITY_VALUES,
24)
26if TYPE_CHECKING:
27 from homeassistant.helpers.typing import ConfigType
29_LOGGER = logging.getLogger(__name__)
31_FALSY_STRINGS = frozenset({"false", "0", "no", "off", ""})
34def safe_get(probably_a_dict: dict[Any, Any] | None, key: Any, default: Any = None) -> Any:
35 probably_a_dict = probably_a_dict or {}
36 return probably_a_dict.get(key, default)
39def safe_extend(target: list[Any], extension: list[Any] | tuple[Any] | Any) -> list[Any]:
40 if target is None:
41 target = []
42 elif not isinstance(target, list):
43 target = [target]
44 if isinstance(extension, list | tuple):
45 target.extend(extension)
46 elif extension:
47 target.append(extension)
48 return target
51def nullable_ensure_list(v: Any) -> list[Any] | None:
52 if v is None:
53 return None
54 return ensure_list(v)
57def ensure_list(v: Any) -> list[Any]:
58 if v is None:
59 return []
60 if isinstance(v, list):
61 return v
62 if isinstance(v, tuple):
63 return list(v)
64 return [v]
67def ensure_dict(v: Any, default: Any = None) -> dict[Any, Any]:
68 if v is None:
69 return {}
70 if isinstance(v, dict):
71 return v
72 if isinstance(v, set | list):
73 return dict.fromkeys(v, default)
74 return {v: default}
77def sanitize(v: Any, minimal: bool = True, top_level_keys_only: bool = False, **kwargs) -> Any:
78 if isinstance(v, dt.datetime | dt.time | dt.date):
79 return v.isoformat()
80 if isinstance(v, str | int | float | bool):
81 return v
82 if isinstance(v, list | KeysView):
83 return [sanitize(vv, minimal=minimal, **kwargs) for vv in v]
84 if isinstance(v, tuple):
85 return (sanitize(vv, minimal=minimal, **kwargs) for vv in v)
86 if isinstance(v, dict):
87 if top_level_keys_only:
88 return [sanitize(k, minimal, **kwargs) for k in v]
89 return {k: sanitize(vv, minimal=minimal, **kwargs) for k, vv in v.items()}
90 if isinstance(v, Enum):
91 return str(v)
92 if isinstance(v, object):
93 if hasattr(v, "contents"):
94 return sanitize(v.contents(**kwargs), minimal=minimal, **kwargs)
95 if hasattr(v, "as_dict"):
96 return v.as_dict(**kwargs)
97 return None
100@dataclass
101class CallRecord:
102 elapsed: float = field()
103 domain: str | None = field(default=None)
104 action: str | None = field(default=None)
105 action_data: dict[str, Any] | None = field(default=None)
106 target_data: dict[str, Any] | None = field(default=None)
107 exception: str | None = field(default=None)
108 debug: bool = field(default=False)
109 service_response: dict[str, Any] | None = field(default=None)
111 def contents(self, **_kwargs: Any) -> dict[str, Any]:
112 result = {
113 "domain": self.domain,
114 "action": self.action,
115 "action_data": self.action_data,
116 "elapsed": self.elapsed,
117 "debug": self.debug,
118 }
119 if self.target_data is not None:
120 result["target_data"] = self.target_data
121 if self.exception is not None:
122 result["exception"] = self.exception
123 if self.service_response is not None:
124 result["service_response"] = self.service_response
125 return result
128class DupeCheckable:
129 id: str
130 priority: str
132 @abstractmethod
133 def hash(self) -> int:
134 raise NotImplementedError
137class DupeChecker:
138 """Apply the duplicate checking policy to duplicate candidates"""
140 def __init__(self, dupe_check_config: ConfigType) -> None:
141 self.policy = dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP)
142 # dupe check cache, key is (priority, message hash)
143 self.cache: TTLCache[tuple[int, int], str] = TTLCache(
144 maxsize=dupe_check_config.get(CONF_SIZE, 100), ttl=dupe_check_config.get(CONF_TTL, 120)
145 )
147 def check(self, dupe_candidate: DupeCheckable) -> bool:
148 if self.policy == ATTR_DUPE_POLICY_NONE:
149 return False
150 hashed: int = dupe_candidate.hash()
151 ranked_priority: int = PRIORITY_VALUES.get(dupe_candidate.priority, 3)
152 if self.policy == ATTR_DUPE_POLICY_MTSLP:
153 dupe: bool = any(prev_hash == hashed and prev_prior >= ranked_priority for prev_hash, prev_prior in self.cache)
154 elif self.policy == ATTR_DUPE_POLICY_MT:
155 dupe = any(prev_hash == hashed for prev_hash, _prev_prior in self.cache)
156 else:
157 dupe = False
158 if dupe:
159 _LOGGER.debug("SUPERNOTIFY Detected dupe: %s", dupe_candidate.id)
160 self.cache[hashed, ranked_priority] = dupe_candidate.id
161 return dupe
164def boolify(value: Any, default: bool) -> bool:
165 """Convert a value to bool, correctly handling string 'false'/'true'.
167 Python's built-in bool() treats any non-empty string as True, so
168 bool("false") == True. This helper avoids that pitfall for values
169 that may arrive as YAML strings or Jinja2 template results.
170 """
171 if value is None:
172 return default
173 if isinstance(value, bool):
174 return value
175 if isinstance(value, str):
176 return value.strip().lower() not in _FALSY_STRINGS
177 return bool(value)