Coverage for custom_components/supernotify/common.py: 34%
106 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
1"""Miscellaneous helper functions.
3No same pkg dependencies permitted
4"""
6import datetime as dt
7import logging
8from abc import abstractmethod
9from collections.abc import KeysView
10from dataclasses import dataclass, field
11from enum import Enum
12from typing import Any
14from cachetools import TTLCache
15from homeassistant.helpers.typing import ConfigType
17from . import (
18 ATTR_DUPE_POLICY_MTSLP,
19 ATTR_DUPE_POLICY_NONE,
20 CONF_DUPE_POLICY,
21 CONF_SIZE,
22 CONF_TTL,
23 PRIORITY_VALUES,
24)
26_LOGGER = logging.getLogger(__name__)
29def safe_get(probably_a_dict: dict[Any, Any] | None, key: Any, default: Any = None) -> Any:
30 probably_a_dict = probably_a_dict or {}
31 return probably_a_dict.get(key, default)
34def safe_extend(target: list[Any], extension: list[Any] | tuple[Any] | Any) -> list[Any]:
35 if target is None:
36 target = []
37 elif not isinstance(target, list):
38 target = [target]
39 if isinstance(extension, list | tuple):
40 target.extend(extension)
41 elif extension:
42 target.append(extension)
43 return target
46def nullable_ensure_list(v: Any) -> list[Any] | None:
47 if v is None:
48 return None
49 return ensure_list(v)
52def ensure_list(v: Any) -> list[Any]:
53 if v is None:
54 return []
55 if isinstance(v, list):
56 return v
57 if isinstance(v, tuple):
58 return list(v)
59 return [v]
62def ensure_dict(v: Any, default: Any = None) -> dict[Any, Any]:
63 if v is None:
64 return {}
65 if isinstance(v, dict):
66 return v
67 if isinstance(v, set | list):
68 return dict.fromkeys(v, default)
69 return {v: default}
72def sanitize(v: Any, minimal: bool = True, top_level_keys_only: bool = False, **kwargs) -> Any:
73 if isinstance(v, dt.datetime | dt.time | dt.date):
74 return v.isoformat()
75 if isinstance(v, str | int | float | bool):
76 return v
77 if isinstance(v, list | KeysView):
78 return [sanitize(vv, minimal=minimal, **kwargs) for vv in v]
79 if isinstance(v, tuple):
80 return (sanitize(vv, minimal=minimal, **kwargs) for vv in v)
81 if isinstance(v, dict):
82 if top_level_keys_only:
83 return [sanitize(k, minimal, **kwargs) for k in v]
84 return {k: sanitize(vv, minimal=minimal, **kwargs) for k, vv in v.items()}
85 if isinstance(v, Enum):
86 return str(v)
87 if isinstance(v, object):
88 if hasattr(v, "contents"):
89 return sanitize(v.contents(**kwargs), minimal=minimal, **kwargs)
90 if hasattr(v, "as_dict"):
91 return v.as_dict(**kwargs)
92 return None
95@dataclass
96class CallRecord:
97 elapsed: float = field()
98 domain: str | None = field(default=None)
99 action: str | None = field(default=None)
100 action_data: dict[str, Any] | None = field(default=None)
101 target_data: dict[str, Any] | None = field(default=None)
102 exception: str | None = field(default=None)
103 debug: bool = field(default=False)
104 service_response: dict[str, Any] | None = field(default=None)
106 def contents(self, **_kwargs: Any) -> dict[str, Any]:
107 result = {
108 "domain": self.domain,
109 "action": self.action,
110 "action_data": self.action_data,
111 "elapsed": self.elapsed,
112 "debug": self.debug,
113 }
114 if self.target_data is not None:
115 result["target_data"] = self.target_data
116 if self.exception is not None:
117 result["exception"] = self.exception
118 if self.service_response is not None:
119 result["service_response"] = self.service_response
120 return result
123class DupeCheckable:
124 id: str
125 priority: str
127 @abstractmethod
128 def hash(self) -> int:
129 raise NotImplementedError
132class DupeChecker:
133 def __init__(self, dupe_check_config: ConfigType) -> None:
134 self.policy = dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP)
135 # dupe check cache, key is (priority, message hash)
136 self.cache: TTLCache[tuple[int, int], str] = TTLCache(
137 maxsize=dupe_check_config.get(CONF_SIZE, 100), ttl=dupe_check_config.get(CONF_TTL, 120)
138 )
140 def check(self, dupe_candidate: DupeCheckable) -> bool:
141 if self.policy == ATTR_DUPE_POLICY_NONE:
142 return False
143 hashed: int = dupe_candidate.hash()
144 ranked_priority: int = PRIORITY_VALUES.get(dupe_candidate.priority, 3)
145 dupe = False
146 for prev_hash, prev_prior in self.cache:
147 if prev_hash == hashed and prev_prior >= ranked_priority:
148 _LOGGER.debug("SUPERNOTIFY Detected dupe: %s", dupe_candidate.id)
149 dupe = True
150 self.cache[hashed, ranked_priority] = dupe_candidate.id
151 return dupe